diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index d847c5fe3e..bca2b8ded1 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -27,6 +27,7 @@ import io.reactivex.internal.fuseable.*; import io.reactivex.internal.operators.completable.CompletableFromPublisher; import io.reactivex.internal.operators.flowable.*; +import io.reactivex.internal.operators.maybe.MaybeFromPublisher; import io.reactivex.internal.operators.observable.ObservableFromPublisher; import io.reactivex.internal.operators.single.SingleFromPublisher; import io.reactivex.internal.schedulers.ImmediateThinScheduler; @@ -13598,6 +13599,12 @@ public final Single toSingle() { return RxJavaPlugins.onAssembly(new SingleFromPublisher(this)); } + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe toMaybe() { + return new MaybeFromPublisher(this); + } + /** * Returns a Flowable that emits a list that contains the items emitted by the source Publisher, in a * sorted order. Each item emitted by the Publisher must implement {@link Comparable} with respect to all diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java new file mode 100644 index 0000000000..296dd6b4a7 --- /dev/null +++ b/src/main/java/io/reactivex/Maybe.java @@ -0,0 +1,850 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.concurrent.*; + +import org.reactivestreams.*; + +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.*; +import io.reactivex.internal.operators.maybe.*; +import io.reactivex.internal.subscribers.maybe.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +/** + * Represents a deferred computation and emission of a maybe value or exception. + * + * @param the value type + */ +public abstract class Maybe implements MaybeSource { + static Maybe wrap(MaybeSource source) { + ObjectHelper.requireNonNull(source, "source is null"); + if (source instanceof Maybe) { + return (Maybe)source; + } + return new MaybeFromUnsafeSource(source); + } + + public static Maybe amb(final Iterable> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + return new MaybeAmbIterable(sources); + } + + @SuppressWarnings("unchecked") + public static Maybe amb(final MaybeSource... sources) { + if (sources.length == 0) { + return Maybe.complete(); + } + if (sources.length == 1) { + return wrap((MaybeSource)sources[0]); + } + return new MaybeAmbArray(sources); + } + + @SuppressWarnings("unchecked") + public static Maybe complete() { + return (Maybe) MaybeComplete.INSTANCE; + } + + public static Flowable concat(Iterable> sources) { + return concat(Flowable.fromIterable(sources)); + } + + public static Flowable concat(Flowable> sources) { // FIXME Publisher + return sources.concatMap(new Function, Publisher>() { + @Override + public Publisher apply(MaybeSource v){ + return new MaybeToFlowable(v); + } + }); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + return concat(Flowable.fromArray(s1, s2)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + return concat(Flowable.fromArray(s1, s2, s3)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4, s5)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4, s5, s6)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8)); + } + + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8, + MaybeSource s9 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + ObjectHelper.requireNonNull(s9, "s9 is null"); + return concat(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); + } + + public static Maybe create(MaybeSource source) { + ObjectHelper.requireNonNull(source, "source is null"); + // TODO plugin wrapper + return new MaybeFromSource(source); + } + + public static Maybe unsafeCreate(MaybeSource source) { + ObjectHelper.requireNonNull(source, "source is null"); + // TODO plugin wrapper + return new MaybeFromUnsafeSource(source); + } + + public static Maybe defer(final Callable> maybeSupplier) { + ObjectHelper.requireNonNull(maybeSupplier, "maybeSupplier is null"); + return new MaybeDefer(maybeSupplier); + } + + public static Maybe error(final Callable errorSupplier) { + ObjectHelper.requireNonNull(errorSupplier, "errorSupplier is null"); + return new MaybeError(errorSupplier); + } + + public static Maybe error(final Throwable error) { + ObjectHelper.requireNonNull(error, "error is null"); + return error(new Callable() { + @Override + public Throwable call() { + return error; + } + }); + } + + public static Maybe fromCallable(final Callable callable) { + ObjectHelper.requireNonNull(callable, "callable is null"); + return new MaybeFromCallable(callable); + } + + public static Maybe fromFuture(Future future) { + return Flowable.fromFuture(future).toMaybe(); + } + + public static Maybe fromFuture(Future future, long timeout, TimeUnit unit) { + return Flowable.fromFuture(future, timeout, unit).toMaybe(); + } + + public static Maybe fromFuture(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { + return Flowable.fromFuture(future, timeout, unit, scheduler).toMaybe(); + } + + public static Maybe fromFuture(Future future, Scheduler scheduler) { + return Flowable.fromFuture(future, scheduler).toMaybe(); + } + + public static Maybe fromPublisher(final Publisher publisher) { + ObjectHelper.requireNonNull(publisher, "publisher is null"); + return new MaybeFromPublisher(publisher); + } + + public static Maybe just(final T value) { + ObjectHelper.requireNonNull(value, "value is null"); + return new MaybeJust(value); + } + + public static Flowable merge(Iterable> sources) { + return merge(Flowable.fromIterable(sources)); + } + + public static Flowable merge(Flowable> sources) { // FIXME Publisher + return sources.flatMap(new Function, Publisher>() { + @Override + public Publisher apply(MaybeSource v){ + return new MaybeToFlowable(v); + } + }); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Maybe merge(MaybeSource> source) { + ObjectHelper.requireNonNull(source, "source is null"); + return new MaybeFlatMap, T>(source, (Function)Functions.identity()); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + return merge(Flowable.fromArray(s1, s2)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + return merge(Flowable.fromArray(s1, s2, s3)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4, s5)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8)); + } + + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8, + MaybeSource s9 + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + ObjectHelper.requireNonNull(s9, "s9 is null"); + return merge(Flowable.fromArray(s1, s2, s3, s4, s5, s6, s7, s8, s9)); + } + + @SuppressWarnings("unchecked") + public static Maybe never() { + return (Maybe) MaybeNever.INSTANCE; + } + + public static Maybe timer(long delay, TimeUnit unit) { + return timer(delay, unit, Schedulers.computation()); + } + + public static Maybe timer(final long delay, final TimeUnit unit, final Scheduler scheduler) { + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new MaybeTimer(delay, unit, scheduler); + } + + public static Maybe equals(final MaybeSource first, final MaybeSource second) { // NOPMD + ObjectHelper.requireNonNull(first, "first is null"); + ObjectHelper.requireNonNull(second, "second is null"); + return new MaybeEquals(first, second); + } + + public static Maybe using(Callable resourceSupplier, + Function> maybeFunction, Consumer disposer) { + return using(resourceSupplier, maybeFunction, disposer, true); + } + + public static Maybe using( + final Callable resourceSupplier, + final Function> maybeFunction, + final Consumer disposer, + final boolean eager) { + ObjectHelper.requireNonNull(resourceSupplier, "resourceSupplier is null"); + ObjectHelper.requireNonNull(maybeFunction, "maybeFunction is null"); + ObjectHelper.requireNonNull(disposer, "disposer is null"); + + return new MaybeUsing(resourceSupplier, maybeFunction, disposer, eager); + } + + public static Maybe zip(final Iterable> sources, Function zipper) { + ObjectHelper.requireNonNull(sources, "sources is null"); + return Flowable.zipIterable(MaybeInternalHelper.iterableToFlowable(sources), zipper, false, 1).toMaybe(); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + BiFunction zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, + Function3 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + Function4 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, + Function5 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4, s5); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + Function6 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4, s5, s6); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, + Function7 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4, s5, s6, s7); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8, + Function8 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4, s5, s6, s7, s8); + } + + @SuppressWarnings("unchecked") + public static Maybe zip( + MaybeSource s1, MaybeSource s2, + MaybeSource s3, MaybeSource s4, + MaybeSource s5, MaybeSource s6, + MaybeSource s7, MaybeSource s8, + MaybeSource s9, + Function9 zipper + ) { + ObjectHelper.requireNonNull(s1, "s1 is null"); + ObjectHelper.requireNonNull(s2, "s2 is null"); + ObjectHelper.requireNonNull(s3, "s3 is null"); + ObjectHelper.requireNonNull(s4, "s4 is null"); + ObjectHelper.requireNonNull(s5, "s5 is null"); + ObjectHelper.requireNonNull(s6, "s6 is null"); + ObjectHelper.requireNonNull(s7, "s7 is null"); + ObjectHelper.requireNonNull(s8, "s8 is null"); + ObjectHelper.requireNonNull(s9, "s9 is null"); + return zipArray(Functions.toFunction(zipper), s1, s2, s3, s4, s5, s6, s7, s8, s9); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public static Maybe zipArray(Function zipper, MaybeSource... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + Publisher[] sourcePublishers = new Publisher[sources.length]; + int i = 0; + for (MaybeSource s : sources) { + ObjectHelper.requireNonNull(s, "The " + i + "th source is null"); + sourcePublishers[i] = new MaybeToFlowable(s); + i++; + } + return Flowable.zipArray(zipper, false, 1, sourcePublishers).toMaybe(); + } + + @SuppressWarnings("unchecked") + public final Maybe ambWith(MaybeSource other) { + ObjectHelper.requireNonNull(other, "other is null"); + return amb(this, other); + } + + public final Maybe asMaybe() { + return new MaybeHide(this); + } + + public final Maybe compose(Function, ? extends MaybeSource> convert) { + return wrap(to(convert)); + } + + public final Maybe cache() { + return new MaybeCache(this); + } + + public final Maybe cast(final Class clazz) { + ObjectHelper.requireNonNull(clazz, "clazz is null"); + return map(new Function() { + @Override + public U apply(T v) { + return clazz.cast(v); + } + }); + } + + public final Flowable concatWith(MaybeSource other) { + return concat(this, other); + } + + public final Maybe delay(long time, TimeUnit unit) { + return delay(time, unit, Schedulers.computation()); + } + + public final Maybe delay(final long time, final TimeUnit unit, final Scheduler scheduler) { + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new MaybeDelay(this, time, unit, scheduler); + } + + public final Maybe delaySubscription(CompletableSource other) { + return new MaybeDelayWithCompletable(this, other); + } + + public final Maybe delaySubscription(MaybeSource other) { + return new MaybeDelayWithMaybe(this, other); + } + + public final Maybe delaySubscription(ObservableSource other) { + return new MaybeDelayWithObservable(this, other); + } + + public final Maybe delaySubscription(Publisher other) { + return new MaybeDelayWithPublisher(this, other); + } + + public final Maybe delaySubscription(long time, TimeUnit unit) { + return delaySubscription(time, unit, Schedulers.computation()); + } + + public final Maybe delaySubscription(long time, TimeUnit unit, Scheduler scheduler) { + return delaySubscription(Observable.timer(time, unit, scheduler)); + } + + public final Maybe doOnSubscribe(final Consumer onSubscribe) { + ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); + return new MaybeDoOnSubscribe(this, onSubscribe); + } + + public final Maybe doOnSuccess(final Consumer onSuccess) { + ObjectHelper.requireNonNull(onSuccess, "onSuccess is null"); + return new MaybeDoOnSuccess(this, onSuccess); + } + + public final Maybe doOnComplete(final Action onComplete) { + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); + return new MaybeDoOnComplete(this, onComplete); + } + + public final Maybe doOnError(final Consumer onError) { + ObjectHelper.requireNonNull(onError, "onError is null"); + return new MaybeDoOnError(this, onError); + } + + public final Maybe doOnCancel(final Action onCancel) { + ObjectHelper.requireNonNull(onCancel, "onCancel is null"); + return new MaybeDoOnCancel(this, onCancel); + } + + public final Maybe flatMap(Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return new MaybeFlatMap(this, mapper); + } + + public final Flowable flatMapPublisher(Function> mapper) { + return toFlowable().flatMap(mapper); + } + + public final T get(T valueIfComplete) { + return MaybeAwait.get(this, valueIfComplete); + } + + public final Maybe lift(final MaybeOperator onLift) { + ObjectHelper.requireNonNull(onLift, "onLift is null"); + return new MaybeLift(this, onLift); + } + + public final Maybe map(Function mapper) { + return new MaybeMap(this, mapper); + } + + public final Maybe contains(Object value) { + return contains(value, ObjectHelper.equalsPredicate()); + } + + public final Maybe contains(final Object value, final BiPredicate comparer) { + ObjectHelper.requireNonNull(value, "value is null"); + ObjectHelper.requireNonNull(comparer, "comparer is null"); + return new MaybeContains(this, value, comparer); + } + + public final Flowable mergeWith(MaybeSource other) { + return merge(this, other); + } + + public final Maybe> nest() { + return just(this); + } + + public final Maybe observeOn(final Scheduler scheduler) { + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new MaybeObserveOn(this, scheduler); + } + + public final Maybe onErrorReturn(final Function valueFunction) { + ObjectHelper.requireNonNull(valueFunction, "valueFunction is null"); + return new MaybeOnErrorReturn(this, valueFunction, null); + } + + public final Maybe onErrorReturn(final T value) { + ObjectHelper.requireNonNull(value, "value is null"); + return new MaybeOnErrorReturn(this, null, value); + } + + public final Maybe onErrorResumeNext( + final Function> nextFunction) { + ObjectHelper.requireNonNull(nextFunction, "nextFunction is null"); + return new MaybeOnErrorResumeNext(this, nextFunction); + } + + public final Flowable repeat() { + return toFlowable().repeat(); + } + + public final Flowable repeat(long times) { + return toFlowable().repeat(times); + } + + public final Flowable repeatWhen(Function, ? extends Publisher> handler) { + return toFlowable().repeatWhen(handler); + } + + public final Flowable repeatUntil(BooleanSupplier stop) { + return toFlowable().repeatUntil(stop); + } + + public final Maybe retry() { + return toFlowable().retry().toMaybe(); + } + + public final Maybe retry(long times) { + return toFlowable().retry(times).toMaybe(); + } + + public final Maybe retry(BiPredicate predicate) { + return toFlowable().retry(predicate).toMaybe(); + } + + public final Maybe retry(Predicate predicate) { + return toFlowable().retry(predicate).toMaybe(); + } + + public final Maybe retryWhen(Function, ? extends Publisher> handler) { + return toFlowable().retryWhen(handler).toMaybe(); + } + + public final void safeSubscribe(Subscriber s) { + toFlowable().safeSubscribe(s); + } + + public final Disposable subscribe() { + return subscribe(Functions.emptyConsumer(), Functions.EMPTY_ACTION, RxJavaPlugins.getErrorHandler()); + } + + public final Disposable subscribe(Consumer onSuccess) { + return subscribe(onSuccess, Functions.EMPTY_ACTION, RxJavaPlugins.getErrorHandler()); + } + + public final Disposable subscribe(Consumer onSuccess, final Action onComplete) { + return subscribe(onSuccess, onComplete, RxJavaPlugins.getErrorHandler()); + } + + public final Disposable subscribe(final Consumer onSuccess, final Action onComplete, final Consumer onError) { + ObjectHelper.requireNonNull(onSuccess, "onSuccess is null"); + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); + ObjectHelper.requireNonNull(onError, "onError is null"); + + ConsumerMaybeObserver s = new ConsumerMaybeObserver(onSuccess, onComplete, onError); + subscribe(s); + return s; + } + + @Override + public final void subscribe(MaybeObserver subscriber) { + ObjectHelper.requireNonNull(subscriber, "subscriber is null"); + // TODO plugin wrapper + subscribeActual(subscriber); + } + + protected abstract void subscribeActual(MaybeObserver subscriber); + + public final void subscribe(Subscriber s) { + toFlowable().subscribe(s); + } + + public final void subscribe(Observer s) { + toObservable().subscribe(s); + } + + public final Maybe subscribeOn(final Scheduler scheduler) { + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new MaybeSubscribeOn(this, scheduler); + } + + public final Maybe timeout(long timeout, TimeUnit unit) { + return timeout0(timeout, unit, Schedulers.computation(), null); + } + + public final Maybe timeout(long timeout, TimeUnit unit, Scheduler scheduler) { + return timeout0(timeout, unit, scheduler, null); + } + + public final Maybe timeout(long timeout, TimeUnit unit, Scheduler scheduler, MaybeSource other) { + ObjectHelper.requireNonNull(other, "other is null"); + return timeout0(timeout, unit, scheduler, other); + } + + public final Maybe timeout(long timeout, TimeUnit unit, MaybeSource other) { + ObjectHelper.requireNonNull(other, "other is null"); + return timeout0(timeout, unit, Schedulers.computation(), other); + } + + private Maybe timeout0(final long timeout, final TimeUnit unit, final Scheduler scheduler, final MaybeSource other) { + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new MaybeTimeout(this, timeout, unit, scheduler, other); + } + + public final R to(Function, R> convert) { + try { + return convert.apply(this); + } catch (Throwable ex) { + throw Exceptions.propagate(ex); + } + } + + public final Flowable toFlowable() { + return new MaybeToFlowable(this); + } + + public final Observable toObservable() { + return new MaybeToObservable(this); + } + + public final Maybe zipWith(MaybeSource other, BiFunction zipper) { + return zip(this, other, zipper); + } +} diff --git a/src/main/java/io/reactivex/MaybeObserver.java b/src/main/java/io/reactivex/MaybeObserver.java new file mode 100644 index 0000000000..582d4fef24 --- /dev/null +++ b/src/main/java/io/reactivex/MaybeObserver.java @@ -0,0 +1,26 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.disposables.Disposable; + +public interface MaybeObserver { + void onSubscribe(Disposable d); + + void onSuccess(T value); + + void onComplete(); + + void onError(Throwable e); +} diff --git a/src/main/java/io/reactivex/MaybeOperator.java b/src/main/java/io/reactivex/MaybeOperator.java new file mode 100644 index 0000000000..dabd94323b --- /dev/null +++ b/src/main/java/io/reactivex/MaybeOperator.java @@ -0,0 +1,20 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.functions.Function; + +public interface MaybeOperator extends Function, MaybeObserver> { + +} diff --git a/src/main/java/io/reactivex/MaybeSource.java b/src/main/java/io/reactivex/MaybeSource.java new file mode 100644 index 0000000000..8ec739e5c0 --- /dev/null +++ b/src/main/java/io/reactivex/MaybeSource.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex; + +/** + * Represents a basic {@link Single} source base interface, + * consumable via an {@link SingleObserver}. + *

+ * This class also serves the base type for custom operators wrapped into + * Single via {@link Single#create(MaybeSource)}. + * + * @param the element type + * @since 2.0 + */ +public interface MaybeSource { + + void subscribe(MaybeObserver s); +} diff --git a/src/main/java/io/reactivex/MaybeTransformer.java b/src/main/java/io/reactivex/MaybeTransformer.java new file mode 100644 index 0000000000..d4ba029059 --- /dev/null +++ b/src/main/java/io/reactivex/MaybeTransformer.java @@ -0,0 +1,20 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.functions.Function; + +public interface MaybeTransformer extends Function, MaybeSource> { + +} diff --git a/src/main/java/io/reactivex/functions/Supplier.java b/src/main/java/io/reactivex/functions/Supplier.java new file mode 100644 index 0000000000..f8093257be --- /dev/null +++ b/src/main/java/io/reactivex/functions/Supplier.java @@ -0,0 +1,26 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.functions; + +/** + * A functional interface (callback) that returns an Object value. + */ +public interface Supplier { + /** + * Returns an Object value. + * @return an Object value + * @throws Exception on error + */ + R get() throws Exception; // NOPMD +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java new file mode 100644 index 0000000000..271d47f887 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java @@ -0,0 +1,87 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeAmbArray extends Maybe { + + final MaybeSource[] sources; + + public MaybeAmbArray(MaybeSource[] sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final AtomicBoolean once = new AtomicBoolean(); + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + for (MaybeSource s1 : sources) { + if (once.get()) { + return; + } + + if (s1 == null) { + set.dispose(); + Throwable e = new NullPointerException("One of the sources is null"); + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } + + s1.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + s.onSuccess(value); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + s.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + }); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java new file mode 100644 index 0000000000..a2ab5f62b6 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java @@ -0,0 +1,134 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeAmbIterable extends Maybe { + + final Iterable> sources; + + public MaybeAmbIterable(Iterable> sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + Iterator> iterator; + + try { + iterator = sources.iterator(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (iterator == null) { + s.onError(new NullPointerException("The iterator returned is null")); + return; + } + + final AtomicBoolean once = new AtomicBoolean(); + int c = 0; + + for (;;) { + if (once.get()) { + return; + } + + boolean b; + + try { + b = iterator.hasNext(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (once.get()) { + return; + } + + if (!b) { + break; + } + + if (once.get()) { + return; + } + + MaybeSource s1; + + try { + s1 = iterator.next(); + } catch (Throwable e) { + set.dispose(); + s.onError(e); + return; + } + + if (s1 == null) { + set.dispose(); + s.onError(new NullPointerException("The single source returned by the iterator is null")); + return; + } + + s1.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + s.onSuccess(value); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + s.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + }); + c++; + } + + if (c == 0 && !set.isDisposed()) { + s.onError(new NoSuchElementException()); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAwait.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAwait.java new file mode 100644 index 0000000000..a5cdc0e85d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAwait.java @@ -0,0 +1,73 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.util.NotificationLite; + +public enum MaybeAwait { + ; + + public static T get(MaybeSource source, T defaultIfComplete) { + final AtomicReference valueRef = new AtomicReference(); + final AtomicReference errorRef = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + + source.subscribe(new MaybeObserver() { + @Override + public void onError(Throwable e) { + errorRef.lazySet(e); + cdl.countDown(); + } + + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onSuccess(T value) { + valueRef.lazySet(NotificationLite.next(value)); + cdl.countDown(); + } + + @Override + public void onComplete() { + valueRef.lazySet(NotificationLite.complete()); + } + }); + + if (cdl.getCount() != 0L) { + try { + cdl.await(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + Throwable e = errorRef.get(); + if (e != null) { + throw Exceptions.propagate(e); + } + + Object value = valueRef.get(); + if (NotificationLite.isComplete(value)) + return defaultIfComplete; + else + return NotificationLite.getValue(value); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCache.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCache.java new file mode 100644 index 0000000000..99357d0f57 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCache.java @@ -0,0 +1,122 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.util.NotificationLite; + +public final class MaybeCache extends Maybe { + + final MaybeSource source; + + final AtomicInteger wip; + final AtomicReference notification; + final List> subscribers; + + public MaybeCache(MaybeSource source) { + this.source = source; + this.wip = new AtomicInteger(); + this.notification = new AtomicReference(); + this.subscribers = new ArrayList>(); + } + + @Override + protected void subscribeActual(MaybeObserver s) { + + Object o = notification.get(); + if (o != null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + if (NotificationLite.isError(o)) { + s.onError(NotificationLite.getError(o)); + } else { + s.onSuccess(NotificationLite.getValue(o)); + } + return; + } + + synchronized (subscribers) { + o = notification.get(); + if (o == null) { + subscribers.add(s); + } + } + if (o != null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + if (NotificationLite.isError(o)) { + s.onError(NotificationLite.getError(o)); + } else { + s.onSuccess(NotificationLite.getValue(o)); + } + return; + } + + if (wip.getAndIncrement() != 0) { + return; + } + + source.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(T value) { + notification.set(NotificationLite.next(value)); + List> list; + synchronized (subscribers) { + list = new ArrayList>(subscribers); + subscribers.clear(); + } + for (MaybeObserver s1 : list) { + s1.onSuccess(value); + } + } + + @Override + public void onComplete() { + notification.set(NotificationLite.complete()); + List> list; + synchronized (subscribers) { + list = new ArrayList>(subscribers); + subscribers.clear(); + } + for (MaybeObserver s1 : list) { + s1.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + notification.set(NotificationLite.error(e)); + List> list; + synchronized (subscribers) { + list = new ArrayList>(subscribers); + subscribers.clear(); + } + for (MaybeObserver s1 : list) { + s1.onError(e); + } + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeComplete.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeComplete.java new file mode 100644 index 0000000000..ab35809391 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeComplete.java @@ -0,0 +1,31 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeComplete extends Maybe { + public static final Maybe INSTANCE = new MaybeComplete(); + + private MaybeComplete() { + } + + @Override + protected void subscribeActual(MaybeObserver s) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onComplete(); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeContains.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeContains.java new file mode 100644 index 0000000000..f4bcb83bdc --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeContains.java @@ -0,0 +1,72 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiPredicate; + +public final class MaybeContains extends Maybe { + + final MaybeSource source; + + final Object value; + + final BiPredicate comparer; + + public MaybeContains(MaybeSource source, Object value, BiPredicate comparer) { + this.source = source; + this.value = value; + this.comparer = comparer; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T v) { + boolean b; + + try { + b = comparer.test(v, value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.onError(ex); + return; + } + s.onSuccess(b); + } + + @Override + public void onComplete() { + s.onSuccess(false); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDefer.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDefer.java new file mode 100644 index 0000000000..3c50b6521e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDefer.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeDefer extends Maybe { + + final Callable> maybeSupplier; + + public MaybeDefer(Callable> maybeSupplier) { + this.maybeSupplier = maybeSupplier; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + MaybeSource next; + + try { + next = maybeSupplier.call(); + } catch (Throwable e) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(e); + return; + } + + if (next == null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(new NullPointerException("The Maybe supplied was null")); + return; + } + + next.subscribe(s); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelay.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelay.java new file mode 100644 index 0000000000..66f2e32fcb --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelay.java @@ -0,0 +1,75 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class MaybeDelay extends Maybe { + + + final MaybeSource source; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public MaybeDelay(MaybeSource source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final SerialDisposable sd = new SerialDisposable(); + s.onSubscribe(sd); + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(final T value) { + sd.replace(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(value); + } + }, time, unit)); + } + + @Override + public void onComplete() { + sd.replace(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onComplete(); + } + }, time, unit)); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java new file mode 100644 index 0000000000..30cd403f43 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java @@ -0,0 +1,114 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class MaybeDelayWithCompletable extends Maybe { + + final MaybeSource source; + + final CompletableSource other; + + public MaybeDelayWithCompletable(MaybeSource source, CompletableSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherObserver(subscriber, source)); + } + + static final class OtherObserver + extends AtomicReference + implements CompletableObserver, Disposable { + + /** */ + private static final long serialVersionUID = -8565274649390031272L; + + final MaybeObserver actual; + + final MaybeSource source; + + public OtherObserver(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.set(this, d)) { + + actual.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + + static final class DelayWithMainObserver implements MaybeObserver { + + final AtomicReference parent; + + final MaybeObserver actual; + + public DelayWithMainObserver(AtomicReference parent, MaybeObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithMaybe.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithMaybe.java new file mode 100644 index 0000000000..2dd7c6211a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithMaybe.java @@ -0,0 +1,98 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable.DelayWithMainObserver; + +public final class MaybeDelayWithMaybe extends Maybe { + + final MaybeSource source; + + final MaybeSource other; + + public MaybeDelayWithMaybe(MaybeSource source, MaybeSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherObserver(subscriber, source)); + } + + static final class OtherObserver + extends AtomicReference + implements MaybeObserver, Disposable { + + /** */ + private static final long serialVersionUID = -8565274649390031272L; + + final MaybeObserver actual; + + final MaybeSource source; + + boolean done; + + public OtherObserver(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.set(this, d)) { + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(U value) { + if (done) { + return; + } + done = true; + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithObservable.java new file mode 100644 index 0000000000..0e3c920052 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithObservable.java @@ -0,0 +1,101 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable.DelayWithMainObserver; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeDelayWithObservable extends Maybe { + + final MaybeSource source; + + final ObservableSource other; + + public MaybeDelayWithObservable(MaybeSource source, ObservableSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherSubscriber(subscriber, source)); + } + + static final class OtherSubscriber + extends AtomicReference + implements Observer, Disposable { + + /** */ + private static final long serialVersionUID = -8565274649390031272L; + + final MaybeObserver actual; + + final MaybeSource source; + + boolean done; + + public OtherSubscriber(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.set(this, d)) { + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(U value) { + get().dispose(); + onComplete(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + done = true; + actual.onError(e); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithPublisher.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithPublisher.java new file mode 100644 index 0000000000..5dc8f03999 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithPublisher.java @@ -0,0 +1,110 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable.DelayWithMainObserver; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeDelayWithPublisher extends Maybe { + + final MaybeSource source; + + final Publisher other; + + public MaybeDelayWithPublisher(MaybeSource source, Publisher other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherSubscriber(subscriber, source)); + } + + static final class OtherSubscriber + extends AtomicReference + implements Subscriber, Disposable { + + /** */ + private static final long serialVersionUID = -8565274649390031272L; + + final MaybeObserver actual; + + final MaybeSource source; + + boolean done; + + Subscription s; + + public OtherSubscriber(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(U value) { + get().dispose(); + onComplete(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + done = true; + actual.onError(e); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void dispose() { + s.cancel(); + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithSingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithSingle.java new file mode 100644 index 0000000000..c99e880c87 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithSingle.java @@ -0,0 +1,83 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable.DelayWithMainObserver; + +public final class MaybeDelayWithSingle extends Maybe { + + final MaybeSource source; + + final SingleSource other; + + public MaybeDelayWithSingle(MaybeSource source, SingleSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherObserver(subscriber, source)); + } + + static final class OtherObserver + extends AtomicReference + implements SingleObserver, Disposable { + + /** */ + private static final long serialVersionUID = -8565274649390031272L; + + final MaybeObserver actual; + + final MaybeSource source; + + public OtherObserver(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.set(this, d)) { + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(U value) { + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnCancel.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnCancel.java new file mode 100644 index 0000000000..1591f397cc --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnCancel.java @@ -0,0 +1,60 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.functions.Action; + +public final class MaybeDoOnCancel extends Maybe { + final MaybeSource source; + + final Action onCancel; + + public MaybeDoOnCancel(MaybeSource source, Action onCancel) { + this.source = source; + this.onCancel = onCancel; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + CompositeDisposable set = new CompositeDisposable(); + set.add(Disposables.from(onCancel)); + set.add(d); + s.onSubscribe(set); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnComplete.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnComplete.java new file mode 100644 index 0000000000..aedfd9883b --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnComplete.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Action; + +public class MaybeDoOnComplete extends Maybe { + + final MaybeSource source; + + final Action onComplete; + + public MaybeDoOnComplete(MaybeSource source, Action onComplete) { + this.source = source; + this.onComplete = onComplete; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + try { + onComplete.run(); + } catch (Throwable ex) { + s.onError(ex); + return; + } + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnError.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnError.java new file mode 100644 index 0000000000..f160224cdb --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnError.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Consumer; + +public final class MaybeDoOnError extends Maybe { + + final MaybeSource source; + + final Consumer onError; + + public MaybeDoOnError(MaybeSource source, Consumer onError) { + this.source = source; + this.onError = onError; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + try { + onError.accept(e); + } catch (Throwable ex) { + e = new CompositeException(ex, e); + } + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSubscribe.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSubscribe.java new file mode 100644 index 0000000000..dc8d6f3b17 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSubscribe.java @@ -0,0 +1,81 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeDoOnSubscribe extends Maybe { + + final MaybeSource source; + + final Consumer onSubscribe; + + public MaybeDoOnSubscribe(MaybeSource source, Consumer onSubscribe) { + this.source = source; + this.onSubscribe = onSubscribe; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + boolean done; + @Override + public void onSubscribe(Disposable d) { + try { + onSubscribe.accept(d); + } catch (Throwable ex) { + done = true; + d.dispose(); + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + if (done) { + return; + } + s.onSuccess(value); + } + + @Override + public void onComplete() { + if (done) { + return; + } + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSuccess.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSuccess.java new file mode 100644 index 0000000000..55ab3d118d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnSuccess.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; + +public class MaybeDoOnSuccess extends Maybe { + + final MaybeSource source; + + final Consumer onSuccess; + + public MaybeDoOnSuccess(MaybeSource source, Consumer onSuccess) { + this.source = source; + this.onSuccess = onSuccess; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + try { + onSuccess.accept(value); + } catch (Throwable ex) { + s.onError(ex); + return; + } + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeEquals.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeEquals.java new file mode 100644 index 0000000000..08fe486d8e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeEquals.java @@ -0,0 +1,91 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeEquals extends Maybe { + + final MaybeSource first; + final MaybeSource second; + + public MaybeEquals(MaybeSource first, MaybeSource second) { + this.first = first; + this.second = second; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final AtomicInteger count = new AtomicInteger(); + final Object[] values = { null, null }; + + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + class InnerObserver implements MaybeObserver { + final int index; + public InnerObserver(int index) { + this.index = index; + } + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + values[index] = value; + + if (count.incrementAndGet() == 2) { + s.onSuccess(ObjectHelper.equals(values[0], values[1])); + } + } + + @Override + public void onComplete() { + values[index] = Notification.createOnComplete(); + + if (count.incrementAndGet() == 2) { + s.onSuccess(ObjectHelper.equals(values[0], values[1])); + } + } + + @Override + public void onError(Throwable e) { + for (;;) { + int state = count.get(); + if (state >= 2) { + RxJavaPlugins.onError(e); + return; + } + if (count.compareAndSet(state, 2)) { + s.onError(e); + return; + } + } + } + + } + + first.subscribe(new InnerObserver(0)); + second.subscribe(new InnerObserver(1)); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeError.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeError.java new file mode 100644 index 0000000000..2876ad157e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeError.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeError extends Maybe { + + final Callable errorSupplier; + + public MaybeError(Callable errorSupplier) { + this.errorSupplier = errorSupplier; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + Throwable error; + + try { + error = errorSupplier.call(); + } catch (Throwable e) { + error = e; + } + + if (error == null) { + error = new NullPointerException(); + } + + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(error); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java new file mode 100644 index 0000000000..39fad96189 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java @@ -0,0 +1,108 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.functions.Function; + +public final class MaybeFlatMap extends Maybe { + final MaybeSource source; + + final Function> mapper; + + public MaybeFlatMap(MaybeSource source, Function> mapper) { + this.mapper = mapper; + this.source = source; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + MaybeFlatMapCallback parent = new MaybeFlatMapCallback(subscriber, mapper); + subscriber.onSubscribe(parent.sd); + source.subscribe(parent); + } + + static final class MaybeFlatMapCallback implements MaybeObserver { + final MaybeObserver actual; + final Function> mapper; + + final SerialDisposable sd; + + public MaybeFlatMapCallback(MaybeObserver actual, + Function> mapper) { + this.actual = actual; + this.mapper = mapper; + this.sd = new SerialDisposable(); + } + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(T value) { + MaybeSource o; + + try { + o = mapper.apply(value); + } catch (Throwable e) { + actual.onError(e); + return; + } + + if (o == null) { + actual.onError(new NullPointerException("The single returned by the mapper is null")); + return; + } + + if (sd.isDisposed()) { + return; + } + + o.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(R value) { + actual.onSuccess(value); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + }); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCallable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCallable.java new file mode 100644 index 0000000000..5aa6ab010d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCallable.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeFromCallable extends Maybe { + + final Callable callable; + + public MaybeFromCallable(Callable callable) { + this.callable = callable; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + + s.onSubscribe(EmptyDisposable.INSTANCE); + try { + T v = callable.call(); + if (v != null) { + s.onSuccess(v); + } else { + s.onComplete(); + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + s.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromObservable.java new file mode 100644 index 0000000000..50a8440ef5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromObservable.java @@ -0,0 +1,56 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.disposables.Disposable; + +public final class MaybeFromObservable extends Maybe { + private final Observable upstream; + + public MaybeFromObservable(Observable upstream) { + this.upstream = upstream; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + upstream.subscribe(new Observer() { + T last; + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + @Override + public void onNext(T value) { + last = value; + } + @Override + public void onError(Throwable e) { + s.onError(e); + } + @Override + public void onComplete() { + T v = last; + last = null; + if (v != null) { + s.onSuccess(v); + } else { + s.onComplete(); + } + } + }); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromPublisher.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromPublisher.java new file mode 100644 index 0000000000..765f446cb6 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromPublisher.java @@ -0,0 +1,114 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeFromPublisher extends Maybe { + + final Publisher publisher; + + public MaybeFromPublisher(Publisher publisher) { + this.publisher = publisher; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + publisher.subscribe(new ToMaybeSubscriber(s)); + } + + static final class ToMaybeSubscriber implements Subscriber, Disposable { + final MaybeObserver actual; + + Subscription s; + + T value; + + boolean done; + + volatile boolean disposed; + + public ToMaybeSubscriber(MaybeObserver actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (value != null) { + s.cancel(); + done = true; + this.value = null; + actual.onError(new IndexOutOfBoundsException("Too many elements in the Publisher")); + } else { + value = t; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + this.value = null; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + T v = this.value; + this.value = null; + if (v == null) { + actual.onComplete(); + } else { + actual.onSuccess(v); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + @Override + public void dispose() { + disposed = true; + s.cancel(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSource.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSource.java new file mode 100644 index 0000000000..ea85ad9a67 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSource.java @@ -0,0 +1,86 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.disposables.Disposable; +import java.util.concurrent.atomic.AtomicBoolean; + +public final class MaybeFromSource extends Maybe { + private final MaybeSource source; + + public MaybeFromSource(MaybeSource source) { + this.source = source; + } + + @Override protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DisposeAwareMaybeObserver(observer)); + } + + /** + * An observer which does not send downstream notifications once disposed. Used to guard against + * naive implementations of {@link MaybeSource} which do not check for this. + */ + static final class DisposeAwareMaybeObserver + extends AtomicBoolean + implements MaybeObserver, Disposable { + private final MaybeObserver o; + private Disposable d; + + DisposeAwareMaybeObserver(MaybeObserver o) { + this.o = o; + } + + @Override + public void onSuccess(T value) { + if (!get()) { + o.onSuccess(value); + } + } + + @Override + public void onComplete() { + if (!get()) { + o.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + if (!get()) { + o.onError(e); + } + } + + @Override + public void onSubscribe(Disposable d) { + this.d = d; + o.onSubscribe(this); + } + + @Override + public void dispose() { + if (compareAndSet(false, true)) { + d.dispose(); + d = null; + } + } + + @Override + public boolean isDisposed() { + return get(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromUnsafeSource.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromUnsafeSource.java new file mode 100644 index 0000000000..c36169493a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromUnsafeSource.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; + +public final class MaybeFromUnsafeSource extends Maybe { + final MaybeSource source; + + public MaybeFromUnsafeSource(MaybeSource source) { + this.source = source; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(observer); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeHide.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeHide.java new file mode 100644 index 0000000000..6e5c232f6f --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeHide.java @@ -0,0 +1,77 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class MaybeHide extends Maybe { + + final MaybeSource source; + + public MaybeHide(MaybeSource source) { + this.source = source; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + source.subscribe(new HideMaybeObserver(subscriber)); + } + + static final class HideMaybeObserver implements MaybeObserver, Disposable { + + final MaybeObserver actual; + + Disposable d; + + public HideMaybeObserver(MaybeObserver actual) { + this.actual = actual; + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeInternalHelper.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeInternalHelper.java new file mode 100644 index 0000000000..f331d2e781 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeInternalHelper.java @@ -0,0 +1,98 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; +import java.util.concurrent.Callable; + +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +/** + * Helper utility class to support Maybe with inner classes. + */ +public enum MaybeInternalHelper { + ; + + enum NoSuchElementCallable implements Callable { + INSTANCE; + + @Override + public NoSuchElementException call() throws Exception { + return new NoSuchElementException(); + } + } + + public static Callable emptyThrower() { + return NoSuchElementCallable.INSTANCE; + } + + @SuppressWarnings("rawtypes") + enum ToFlowable implements Function { + INSTANCE; + @SuppressWarnings("unchecked") + @Override + public Publisher apply(MaybeSource v){ + return new MaybeToFlowable(v); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Function, Publisher> toFlowable() { + return (Function)ToFlowable.INSTANCE; + } + + static final class ToFlowableIterator implements Iterator> { + private final Iterator> sit; + + ToFlowableIterator(Iterator> sit) { + this.sit = sit; + } + + @Override + public boolean hasNext() { + return sit.hasNext(); + } + + @Override + public Flowable next() { + return new MaybeToFlowable(sit.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + static final class ToFlowableIterable implements Iterable> { + + private final Iterable> sources; + + ToFlowableIterable(Iterable> sources) { + this.sources = sources; + } + + @Override + public Iterator> iterator() { + return new ToFlowableIterator(sources.iterator()); + } + } + + public static Iterable> iterableToFlowable(final Iterable> sources) { + return new ToFlowableIterable(sources); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeJust.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeJust.java new file mode 100644 index 0000000000..47827d8fc8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeJust.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeJust extends Maybe { + + final T value; + + public MaybeJust(T value) { + this.value = value; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onSuccess(value); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeLift.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeLift.java new file mode 100644 index 0000000000..517baff1b7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeLift.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeLift extends Maybe { + + final MaybeSource source; + + final MaybeOperator onLift; + + public MaybeLift(MaybeSource source, MaybeOperator onLift) { + this.source = source; + this.onLift = onLift; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + try { + MaybeObserver sr = onLift.apply(s); + + if (sr == null) { + throw new NullPointerException("The onLift returned a null subscriber"); + } + // TODO plugin wrapper + source.subscribe(sr); + } catch (NullPointerException ex) { // NOPMD + throw ex; + } catch (Throwable ex) { + RxJavaPlugins.onError(ex); + NullPointerException npe = new NullPointerException("Not really but can't throw other than NPE"); + npe.initCause(ex); + throw npe; + } + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMap.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMap.java new file mode 100644 index 0000000000..fdb5c2f072 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMap.java @@ -0,0 +1,62 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Function; + +public final class MaybeMap extends Maybe { + final MaybeSource source; + + final Function mapper; + + public MaybeMap(MaybeSource source, Function mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(final MaybeObserver t) { + source.subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + t.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + R v; + try { + v = mapper.apply(value); + } catch (Throwable e) { + onError(e); + return; + } + + t.onSuccess(v); + } + + @Override + public void onComplete() { + t.onComplete(); + } + + @Override + public void onError(Throwable e) { + t.onError(e); + } + }); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeNever.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeNever.java new file mode 100644 index 0000000000..4bc8ada83b --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeNever.java @@ -0,0 +1,30 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.internal.disposables.EmptyDisposable; + +public final class MaybeNever extends Maybe { + public static final Maybe INSTANCE = new MaybeNever(); + + private MaybeNever() { + } + + @Override + protected void subscribeActual(MaybeObserver s) { + s.onSubscribe(EmptyDisposable.INSTANCE); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeObserveOn.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeObserveOn.java new file mode 100644 index 0000000000..81c89a3513 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeObserveOn.java @@ -0,0 +1,75 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class MaybeObserveOn extends Maybe { + + final MaybeSource source; + + final Scheduler scheduler; + + public MaybeObserveOn(MaybeSource source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final CompositeDisposable mad = new CompositeDisposable(); + s.onSubscribe(mad); + + source.subscribe(new MaybeObserver() { + + @Override + public void onError(final Throwable e) { + mad.add(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onError(e); + } + })); + } + + @Override + public void onSubscribe(Disposable d) { + mad.add(d); + } + + @Override + public void onSuccess(final T value) { + mad.add(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(value); + } + })); + } + + @Override + public void onComplete() { + mad.add(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onComplete(); + } + })); + } + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnCompleteResumeNext.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnCompleteResumeNext.java new file mode 100644 index 0000000000..d46e840a20 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnCompleteResumeNext.java @@ -0,0 +1,99 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.functions.Supplier; + +public final class MaybeOnCompleteResumeNext extends Maybe { + final MaybeSource source; + + final Supplier> nextSupplier; + + public MaybeOnCompleteResumeNext(MaybeSource source, + Supplier> nextSupplier) { + this.source = source; + this.nextSupplier = nextSupplier; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final SerialDisposable sd = new SerialDisposable(); + s.onSubscribe(sd); + + source.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + MaybeSource next; + + try { + next = nextSupplier.get(); + } catch (Throwable ex) { + s.onError(ex); + return; + } + + if (next == null) { + NullPointerException npe = new NullPointerException("The next Maybe supplied was null"); + s.onError(npe); + return; + } + + next.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorComplete.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorComplete.java new file mode 100644 index 0000000000..71bb3ed838 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorComplete.java @@ -0,0 +1,83 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; + +public final class MaybeOnErrorComplete extends Maybe { + final MaybeSource source; + + final Callable valueSupplier; + + final T value; + + public MaybeOnErrorComplete(MaybeSource source, Callable valueSupplier, T value) { + this.source = source; + this.valueSupplier = valueSupplier; + this.value = value; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + + @Override + public void onError(Throwable e) { + T v; + + if (valueSupplier != null) { + try { + v = valueSupplier.call(); + } catch (Throwable ex) { + s.onError(new CompositeException(ex, e)); + return; + } + } else { + v = value; + } + + if (v == null) { + NullPointerException npe = new NullPointerException("Value supplied was null"); + npe.initCause(e); + s.onError(npe); + return; + } + + s.onSuccess(v); + } + + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorResumeNext.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorResumeNext.java new file mode 100644 index 0000000000..fc4f660bea --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorResumeNext.java @@ -0,0 +1,101 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Function; + +public final class MaybeOnErrorResumeNext extends Maybe { + final MaybeSource source; + + final Function> nextFunction; + + public MaybeOnErrorResumeNext(MaybeSource source, + Function> nextFunction) { + this.source = source; + this.nextFunction = nextFunction; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final SerialDisposable sd = new SerialDisposable(); + s.onSubscribe(sd); + + source.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + MaybeSource next; + + try { + next = nextFunction.apply(e); + } catch (Throwable ex) { + s.onError(new CompositeException(ex, e)); + return; + } + + if (next == null) { + NullPointerException npe = new NullPointerException("The next Maybe supplied was null"); + npe.initCause(e); + s.onError(npe); + return; + } + + next.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + sd.replace(d); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + }); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorReturn.java new file mode 100644 index 0000000000..2de2a2825d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeOnErrorReturn.java @@ -0,0 +1,86 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.Function; + +public final class MaybeOnErrorReturn extends Maybe { + final MaybeSource source; + + final Function valueFunction; + + final T value; + + public MaybeOnErrorReturn(MaybeSource source, Function valueFunction, T value) { + this.source = source; + this.valueFunction = valueFunction; + this.value = value; + } + + + + @Override + protected void subscribeActual(final MaybeObserver s) { + + source.subscribe(new MaybeObserver() { + + @Override + public void onError(Throwable e) { + T v; + + if (valueFunction != null) { + try { + v = valueFunction.apply(e); + } catch (Throwable ex) { + s.onError(new CompositeException(ex, e)); + return; + } + } else { + v = value; + } + + if (v == null) { + NullPointerException npe = new NullPointerException("Value supplied was null"); + npe.initCause(e); + s.onError(npe); + return; + } + + s.onSuccess(v); + } + + @Override + public void onSubscribe(Disposable d) { + s.onSubscribe(d); + } + + @Override + public void onComplete() { + s.onComplete(); + } + + @Override + public void onSuccess(T value) { + s.onSuccess(value); + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOn.java new file mode 100644 index 0000000000..d278ec5b4d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOn.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; + +public final class MaybeSubscribeOn extends Maybe { + final MaybeSource source; + + final Scheduler scheduler; + + public MaybeSubscribeOn(MaybeSource source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + // FIXME cancel schedule + scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + source.subscribe(s); + } + }); + + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimeout.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimeout.java new file mode 100644 index 0000000000..8b70a6b2d8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimeout.java @@ -0,0 +1,126 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; + +public final class MaybeTimeout extends Maybe { + + final MaybeSource source; + + final long timeout; + + final TimeUnit unit; + + final Scheduler scheduler; + + final MaybeSource other; + + public MaybeTimeout(MaybeSource source, long timeout, TimeUnit unit, Scheduler scheduler, + MaybeSource other) { + this.source = source; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + this.other = other; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final CompositeDisposable set = new CompositeDisposable(); + s.onSubscribe(set); + + final AtomicBoolean once = new AtomicBoolean(); + + Disposable timer = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (once.compareAndSet(false, true)) { + if (other != null) { + set.clear(); + other.subscribe(new MaybeObserver() { + + @Override + public void onError(Throwable e) { + set.dispose(); + s.onError(e); + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + set.dispose(); + s.onSuccess(value); + } + + @Override + public void onComplete() { + set.dispose(); + s.onComplete(); + } + }); + } else { + set.dispose(); + s.onError(new TimeoutException()); + } + } + } + }, timeout, unit); + + set.add(timer); + + source.subscribe(new MaybeObserver() { + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onError(e); + } + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onSuccess(value); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + set.dispose(); + s.onComplete(); + } + } + }); + + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimer.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimer.java new file mode 100644 index 0000000000..e1e52dc6a2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeTimer.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.*; +import io.reactivex.disposables.SerialDisposable; + +public final class MaybeTimer extends Maybe { + + final long delay; + final TimeUnit unit; + final Scheduler scheduler; + + public MaybeTimer(long delay, TimeUnit unit, Scheduler scheduler) { + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + SerialDisposable sd = new SerialDisposable(); + + s.onSubscribe(sd); + + sd.replace(scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + s.onSuccess(0L); + } + }, delay, unit)); + } + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeToFlowable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToFlowable.java new file mode 100644 index 0000000000..f80317fb79 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToFlowable.java @@ -0,0 +1,82 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.maybe; + +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.subscriptions.DeferredScalarSubscription; + +/** + * Wraps a Maybe and exposes it as a Flowable. + * + * @param the value type + */ +public final class MaybeToFlowable extends Flowable { + + final MaybeSource source; + + public MaybeToFlowable(MaybeSource source) { + this.source = source; + } + + @Override + public void subscribeActual(final Subscriber s) { + source.subscribe(new MaybeToFlowableObserver(s)); + } + + static final class MaybeToFlowableObserver extends DeferredScalarSubscription + implements MaybeObserver { + + /** */ + private static final long serialVersionUID = 187782011903685568L; + + Disposable d; + + public MaybeToFlowableObserver(Subscriber actual) { + super(actual); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + complete(value); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void cancel() { + super.cancel(); + d.dispose(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeToObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToObservable.java new file mode 100644 index 0000000000..ba5ba41a2c --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToObservable.java @@ -0,0 +1,83 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * Wraps a Maybe and exposes it as a Flowable. + * + * @param the value type + */ +public final class MaybeToObservable extends Observable { + + final MaybeSource source; + + public MaybeToObservable(MaybeSource source) { + this.source = source; + } + + @Override + public void subscribeActual(final Observer s) { + source.subscribe(new MaybeToObservableObserver(s)); + } + + static final class MaybeToObservableObserver + implements MaybeObserver, Disposable { + + final Observer actual; + + Disposable d; + + public MaybeToObservableObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onNext(value); + actual.onComplete(); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java new file mode 100644 index 0000000000..02cc3a889c --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java @@ -0,0 +1,152 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.functions.*; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +public final class MaybeUsing extends Maybe { + + final Callable resourceSupplier; + final Function> maybeFunction; + final Consumer disposer; + final boolean eager; + + public MaybeUsing(Callable resourceSupplier, Function> maybeFunction, Consumer disposer, boolean eager) { + this.resourceSupplier = resourceSupplier; + this.maybeFunction = maybeFunction; + this.disposer = disposer; + this.eager = eager; + } + + @Override + protected void subscribeActual(final MaybeObserver s) { + + final U resource; // NOPMD + + try { + resource = resourceSupplier.call(); + } catch (Throwable ex) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + MaybeSource s1; + + try { + s1 = maybeFunction.apply(resource); + } catch (Throwable ex) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(ex); + return; + } + + if (s1 == null) { + s.onSubscribe(EmptyDisposable.INSTANCE); + s.onError(new NullPointerException("The Maybe supplied by the function was null")); + return; + } + + s1.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + if (eager) { + CompositeDisposable set = new CompositeDisposable(); + set.add(d); + set.add(Disposables.from(new Runnable() { + @Override + public void run() { + try { + disposer.accept(resource); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + })); + } else { + s.onSubscribe(d); + } + } + + @Override + public void onSuccess(T value) { + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + s.onError(e); + return; + } + } + s.onSuccess(value); + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + } + + @Override + public void onComplete() { + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + s.onError(e); + return; + } + } + s.onComplete(); + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable e) { + RxJavaPlugins.onError(e); + } + } + } + + @Override + public void onError(Throwable e) { + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable ex) { + e = new CompositeException(ex, e); + } + } + s.onError(e); + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable ex) { + RxJavaPlugins.onError(ex); + } + } + } + + }); + } + +} diff --git a/src/main/java/io/reactivex/internal/subscribers/maybe/ConsumerMaybeObserver.java b/src/main/java/io/reactivex/internal/subscribers/maybe/ConsumerMaybeObserver.java new file mode 100644 index 0000000000..8feb78a597 --- /dev/null +++ b/src/main/java/io/reactivex/internal/subscribers/maybe/ConsumerMaybeObserver.java @@ -0,0 +1,89 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.MaybeObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Action; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +public final class ConsumerMaybeObserver +extends AtomicReference +implements MaybeObserver, Disposable { + + /** */ + private static final long serialVersionUID = -7012088219455310787L; + + final Consumer onSuccess; + + final Action onComplete; + + final Consumer onError; + + public ConsumerMaybeObserver(Consumer onSuccess, Action onComplete, Consumer onError) { + this.onSuccess = onSuccess; + this.onComplete = onComplete; + this.onError = onError; + } + + @Override + public void onError(Throwable e) { + try { + onError.accept(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(new CompositeException(e, ex)); + } + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(T value) { + try { + onSuccess.accept(value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + @Override + public void onComplete() { + try { + onComplete.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return get() == DisposableHelper.DISPOSED; + } +}