diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 393ef9f20d..17b462f3b6 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -231,7 +231,7 @@ public static Flowable combineLatest(Function c public static Flowable combineLatest(Publisher[] sources, Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (sources.length == 0) { return empty(); } @@ -303,7 +303,7 @@ public static Flowable combineLatest(Iterable combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableCombineLatest(sources, combiner, bufferSize, false)); } @@ -408,7 +408,7 @@ public static Flowable combineLatestDelayError(Publisher[ Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (sources.length == 0) { return empty(); } @@ -484,7 +484,7 @@ public static Flowable combineLatestDelayError(Iterable combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableCombineLatest(sources, combiner, bufferSize, true)); } @@ -3408,7 +3408,7 @@ public static Flowable sequenceEqual(Publisher source1 ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(isEqual, "isEqual is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableSequenceEqual(source1, source2, isEqual, bufferSize)); } @@ -3753,36 +3753,6 @@ public static Flowable using(Callable resourceSupplier, return RxJavaPlugins.onAssembly(new FlowableUsing(resourceSupplier, sourceSupplier, resourceDisposer, eager)); } - /** - * Validate that the given value is positive or report an IllegalArgumentException with - * the parameter name. - * @param value the value to validate - * @param paramName the parameter name of the value - * @return the value - * @throws IllegalArgumentException if bufferSize <= 0 - */ - protected static int verifyPositive(int value, String paramName) { - if (value <= 0) { - throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); - } - return value; - } - - /** - * Validate that the given value is positive or report an IllegalArgumentException with - * the parameter name. - * @param value the value to validate - * @param paramName the parameter name of the value - * @return the value - * @throws IllegalArgumentException if bufferSize <= 0 - */ - protected static long verifyPositive(long value, String paramName) { - if (value <= 0L) { - throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); - } - return value; - } - /** * Returns a Flowable that emits the results of a specified combiner function applied to combinations of * items emitted, in sequence, by an Iterable of other Publishers. @@ -4614,7 +4584,7 @@ public static Flowable zipArray(Function(sources, null, zipper, bufferSize, delayError)); } @@ -4674,7 +4644,7 @@ public static Flowable zipIterable(Iterable(null, sources, zipper, bufferSize, delayError)); } @@ -4894,7 +4864,7 @@ public final Iterable blockingIterable() { * @see ReactiveX documentation: To */ public final Iterable blockingIterable(int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return new BlockingFlowableIterable(this, bufferSize); } @@ -5560,7 +5530,7 @@ public final > Flowable buffer( ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); - verifyPositive(count, "count"); + ObjectHelper.verifyPositive(count, "count"); return RxJavaPlugins.onAssembly(new FlowableBufferTimed(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize)); } @@ -5956,7 +5926,7 @@ public final Flowable cache() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable cacheWithInitialCapacity(int initialCapacity) { - verifyPositive(initialCapacity, "initialCapacity"); + ObjectHelper.verifyPositive(initialCapacity, "initialCapacity"); return FlowableCache.from(this, initialCapacity); } @@ -6151,7 +6121,7 @@ public final Flowable concatMap(Function(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } @@ -6221,7 +6191,7 @@ public final Flowable concatMapDelayError(Function(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.IMMEDIATE)); } @@ -6278,8 +6248,8 @@ public final Flowable concatMapEager(Function Flowable concatMapEager(Function> mapper, int maxConcurrency, int prefetch) { - verifyPositive(maxConcurrency, "maxConcurrency"); - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(this, mapper, maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); } @@ -7731,8 +7701,8 @@ public final Flowable flatMap(Function(this, mapper, delayErrors, maxConcurrency, bufferSize)); } @@ -8490,7 +8460,7 @@ public final Flowable> groupBy(Function(this, keySelector, valueSelector, bufferSize, delayError)); } @@ -8922,7 +8892,7 @@ public final Flowable observeOn(Scheduler scheduler, boolean delayError) { @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableObserveOn(this, scheduler, delayError, bufferSize)); } @@ -9083,7 +9053,7 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError) @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded) { - verifyPositive(capacity, "bufferSize"); + ObjectHelper.verifyPositive(capacity, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION)); } @@ -9184,7 +9154,7 @@ public final Flowable onBackpressureBuffer(int capacity, Action onOverflow) { */ public final Publisher onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) { ObjectHelper.requireNonNull(overflowStrategy, "strategy is null"); - verifyPositive(capacity, "capacity"); + ObjectHelper.verifyPositive(capacity, "capacity"); return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy(this, capacity, onOverflow, overflowStrategy)); } @@ -9585,7 +9555,7 @@ public final Flowable publish(Function, ? extends Pub @SchedulerSupport(SchedulerSupport.NONE) public final Flowable publish(Function, ? extends Publisher> selector, int prefetch) { ObjectHelper.requireNonNull(selector, "selector is null"); - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowablePublishMulticast(this, selector, prefetch, false)); } @@ -9613,7 +9583,7 @@ public final Flowable publish(Function, ? extends Pub @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableFlowable publish(int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return FlowablePublish.create(this, bufferSize); } @@ -11299,7 +11269,7 @@ public final Flowable skipLast(long time, TimeUnit unit, Scheduler scheduler, public final Flowable skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the internal buffer holds pairs of (timestamp, value) so double the default buffer size int s = bufferSize << 1; return RxJavaPlugins.onAssembly(new FlowableSkipLastTimed(this, time, unit, scheduler, s, delayError)); @@ -11533,7 +11503,7 @@ public final Flowable startWithArray(T... items) { *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* * - * @return a {@link Subscription} reference with which the {@link Subscriber} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the Publisher has finished sending them * @see ReactiveX operators documentation: Subscribe */ @@ -11558,7 +11528,7 @@ public final Disposable subscribe() { * * @param onNext * the {@code Consumer} you have designed to accept emissions from the Publisher - * @return a {@link Subscription} reference with which the {@link Subscriber} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the Publisher has finished sending them * @throws NullPointerException * if {@code onNext} is null @@ -11587,7 +11557,7 @@ public final Disposable subscribe(Consumer onNext) { * @param onError * the {@code Consumer} you have designed to accept any error notification from the * Publisher - * @return a {@link Subscription} reference with which the {@link Subscriber} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the Publisher has finished sending them * @see ReactiveX operators documentation: Subscribe * @throws IllegalArgumentException @@ -11619,7 +11589,7 @@ public final Disposable subscribe(Consumer onNext, Consumer onNext, Consumer Flowable switchMap0(Function> } return FlowableScalarXMap.scalarXMap(v, mapper); } - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableSwitchMap(this, mapper, bufferSize, delayError)); } @@ -12195,7 +12165,7 @@ public final Flowable takeLast(long count, long time, TimeUnit unit, Schedule public final Flowable takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (count < 0) { throw new IndexOutOfBoundsException("count >= 0 required but it was " + count); } @@ -13281,7 +13251,7 @@ public final Flowable> toList() { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> toList(final int capacityHint) { - verifyPositive(capacityHint, "capacityHint"); + ObjectHelper.verifyPositive(capacityHint, "capacityHint"); return RxJavaPlugins.onAssembly(new FlowableToList>(this, Functions.createArrayList(capacityHint))); } @@ -13849,9 +13819,9 @@ public final Flowable> window(long count, long skip) { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> window(long count, long skip, int bufferSize) { - verifyPositive(skip, "skip"); - verifyPositive(count, "count"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(skip, "skip"); + ObjectHelper.verifyPositive(count, "count"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableWindow(this, count, skip, bufferSize)); } @@ -13960,7 +13930,7 @@ public final Flowable> window(long timespan, long timeskip, TimeUnit @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(unit, "unit is null"); return RxJavaPlugins.onAssembly(new FlowableWindowTimed(this, timespan, timeskip, unit, scheduler, Long.MAX_VALUE, bufferSize, false)); @@ -14231,10 +14201,10 @@ public final Flowable> window(long timespan, TimeUnit unit, public final Flowable> window( long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart, int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(unit, "unit is null"); - verifyPositive(count, "count"); + ObjectHelper.verifyPositive(count, "count"); return RxJavaPlugins.onAssembly(new FlowableWindowTimed(this, timespan, timespan, unit, scheduler, count, bufferSize, restart)); } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index d5c74ccca9..29eebe4a99 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -15,13 +15,16 @@ import java.util.concurrent.Callable; -import io.reactivex.annotations.SchedulerSupport; +import org.reactivestreams.*; + +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.*; import io.reactivex.internal.functions.*; +import io.reactivex.internal.operators.flowable.*; import io.reactivex.internal.operators.maybe.*; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.TestSubscriber; @@ -37,6 +40,44 @@ */ public abstract class Maybe implements MaybeSource { + /** + * Runs multiple Maybe sources and signals the events of the first one that signals (cancelling + * the rest). + *
+ *
Scheduler:
+ *
{@code amb} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Iterable sequence of sources + * @return the new Maybe instance + */ + public static Maybe amb(final Iterable> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new MaybeAmbIterable(sources)); + } + + /** + * Runs multiple Maybe sources and signals the events of the first one that signals (cancelling + * the rest). + *
+ *
Scheduler:
+ *
{@code amb} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the array of sources + * @return the new Maybe instance + */ + @SuppressWarnings("unchecked") + public static Maybe ambArray(final MaybeSource... sources) { + if (sources.length == 0) { + return empty(); + } + if (sources.length == 1) { + return wrap((MaybeSource)sources[0]); + } + return RxJavaPlugins.onAssembly(new MaybeAmbArray(sources)); + } + /** * Provides an API (via a cold Maybe) that bridges the reactive world with the callback-style world. *

@@ -81,6 +122,161 @@ public static Maybe create(MaybeOnSubscribe onSubscribe) { return RxJavaPlugins.onAssembly(new MaybeCreate(onSubscribe)); } + /** + * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources provided by + * an Iterable sequence. + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Iterable sequence of MaybeSource instances + * @return the new Flowable instance + */ + public static Flowable concat(Iterable> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new MaybeConcatIterable(sources)); + } + + /** + * Returns a Flowable that emits the items emitted by two MaybeSources, one after the other. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a MaybeSource to be concatenated + * @param source2 + * a MaybeSource to be concatenated + * @return a Flowable that emits items emitted by the two source MaybeSources, one after the other. + * @see ReactiveX operators documentation: Concat + */ + @SuppressWarnings("unchecked") + public static Flowable concat(MaybeSource source1, MaybeSource source2) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + return concatArray(source1, source2); + } + + /** + * Returns a Flowable that emits the items emitted by three MaybeSources, one after the other. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a MaybeSource to be concatenated + * @param source2 + * a MaybeSource to be concatenated + * @param source3 + * a MaybeSource to be concatenated + * @return a Flowable that emits items emitted by the three source MaybeSources, one after the other. + * @see ReactiveX operators documentation: Concat + */ + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource source1, MaybeSource source2, MaybeSource source3) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + return concatArray(source1, source2, source3); + } + + /** + * Returns a Flowable that emits the items emitted by four MaybeSources, one after the other. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a MaybeSource to be concatenated + * @param source2 + * a MaybeSource to be concatenated + * @param source3 + * a MaybeSource to be concatenated + * @param source4 + * a MaybeSource to be concatenated + * @return a Flowable that emits items emitted by the four source MaybeSources, one after the other. + * @see ReactiveX operators documentation: Concat + */ + @SuppressWarnings("unchecked") + public static Flowable concat( + MaybeSource source1, MaybeSource source2, MaybeSource source3, MaybeSource source4) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + return concatArray(source1, source2, source3, source4); + } + + /** + * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources provided by + * a Publisher sequence. + *
+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Publisher of MaybeSource instances + * @return the new Flowable instance + */ + public static Flowable concat(Publisher> sources) { + return concat(sources, 2); + } + + /** + * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources provided by + * a Publisher sequence. + *
+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Publisher of MaybeSource instances + * @param prefetch the number of MaybeSources to prefetch from the Publisher + * @return the new Flowable instance + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Flowable concat(Publisher> sources, int prefetch) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE)); + } + + /** + * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources in the array. + *
+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Publisher of MaybeSource instances + * @return the new Flowable instance + */ + @SuppressWarnings("unchecked") + public static Flowable concatArray(MaybeSource... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + if (sources.length == 0) { + return Flowable.empty(); + } + if (sources.length == 1) { + return RxJavaPlugins.onAssembly(new MaybeToFlowable((MaybeSource)sources[0])); + } + return RxJavaPlugins.onAssembly(new MaybeConcatArray(sources)); + } + /** * Calls a Callable for each individual MaybeObserver to return the actual MaybeSource source to * be subscribe to. @@ -151,7 +347,7 @@ public static Maybe error(Throwable exception) { * @param supplier * a Callable factory to return a Throwable for each individual Subscriber * @param - * the type of the items (ostensibly) emitted by the Publisher + * the type of the items (ostensibly) emitted by the Maybe * @return a Maybe that invokes the {@link MaybeObserver}'s {@link MaybeObserver#onError onError} method when * the MaybeObserver subscribes to it * @see ReactiveX operators documentation: Throw @@ -244,20 +440,216 @@ public static Maybe just(T item) { ObjectHelper.requireNonNull(item, "item is null"); return RxJavaPlugins.onAssembly(new MaybeJust(item)); } + + /** + * Merges an Iterable sequence of MaybeSource instances into a single Flowable sequence, + * running all MaybeSources at once. + *
+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the common and resulting value type + * @param sources the Iterable sequence of MaybeSource sources + * @return the new Flowable instance + * @since 2.0 + */ + public static Flowable merge(Iterable> sources) { + return merge(Flowable.fromIterable(sources)); + } + + /** + * Merges a Flowable sequence of MaybeSource instances into a single Flowable sequence, + * running all MaybeSources at once. + *
+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the common and resulting value type + * @param sources the Flowable sequence of MaybeSource sources + * @return the new Flowable instance + * @since 2.0 + */ + public static Flowable merge(Publisher> sources) { + return merge(sources, Integer.MAX_VALUE); + } + + /** + * Merges a Flowable sequence of MaybeSource instances into a single Flowable sequence, + * running at most maxConcurrency MaybeSources at once. + *
+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the common and resulting value type + * @param sources the Flowable sequence of MaybeSource sources + * @param maxConcurrency the maximum number of concurrently running MaybeSources + * @return the new Flowable instance + * @since 2.0 + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Flowable merge(Publisher> sources, int maxConcurrency) { + return RxJavaPlugins.onAssembly(new FlowableFlatMap(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize())); + } + + /** + * Flattens a {@code Single} that emits a {@code Single} into a single {@code Single} that emits the item + * emitted by the nested {@code Single}, without any transformation. + *

+ * + *

+ *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the value type of the sources and the output + * @param source + * a {@code Single} that emits a {@code Single} + * @return a {@code Single} that emits the item that is the result of flattening the {@code Single} emitted + * by {@code source} + * @see ReactiveX operators documentation: Merge + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Maybe merge(MaybeSource> source) { + return new MaybeFlatten(source, Functions.identity()); + } + + /** + * Flattens two Singles into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a Single to be merged + * @param source2 + * a Single to be merged + * @return a Flowable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource source1, MaybeSource source2 + ) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + return mergeArray(source1, source2); + } + + /** + * Flattens three Singles into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a Single to be merged + * @param source2 + * a Single to be merged + * @param source3 + * a Single to be merged + * @return a Flowable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource source1, MaybeSource source2, + MaybeSource source3 + ) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + return mergeArray(source1, source2, source3); + } + /** + * Flattens four Singles into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source1 + * a Single to be merged + * @param source2 + * a Single to be merged + * @param source3 + * a Single to be merged + * @param source4 + * a Single to be merged + * @return a Flowable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + @SuppressWarnings("unchecked") + public static Flowable merge( + MaybeSource source1, MaybeSource source2, + MaybeSource source3, MaybeSource source4 + ) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + return mergeArray(source1, source2, source3, source4); + } + + /** + * Merges an array sequence of MaybeSource instances into a single Flowable sequence, + * running all MaybeSources at once. + *
+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the common and resulting value type + * @param sources the array sequence of MaybeSource sources + * @return the new Flowable instance + * @since 2.0 + */ + @SuppressWarnings("unchecked") + public static Flowable mergeArray(MaybeSource... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + if (sources.length == 0) { + return Flowable.empty(); + } + if (sources.length == 1) { + return RxJavaPlugins.onAssembly(new MaybeToFlowable((MaybeSource)sources[0])); + } + return RxJavaPlugins.onAssembly(new MaybeMergeArray(sources)); + } + + /** * Returns a Maybe that never sends any items or notifications to an {@link MaybeObserver}. *

* *

- * This Publisher is useful primarily for testing purposes. + * This Maybe is useful primarily for testing purposes. *

*
Scheduler:
*
{@code never} does not operate by default on a particular {@link Scheduler}.
*
* * @param - * the type of items (not) emitted by the Publisher + * the type of items (not) emitted by the Maybe * @return a Maybe that never emits any items or sends any notifications to an {@link MaybeObserver} * @see ReactiveX operators documentation: Never */ @@ -347,7 +739,28 @@ public final Maybe cast(final Class clazz) { public final Maybe compose(Function, ? extends MaybeSource> transformer) { return wrap(to(transformer)); } - + + /** + * Returns a Maybe that is based on applying a specified function to the item emitted by the source Maybe, + * where that function returns a MaybeSource. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ *

Note that flatMap and concatMap for Maybe is the same operation. + * @param the result value type + * @param mapper + * a function that, when applied to the item emitted by the source Maybe, returns a MaybeSource + * @return the Maybe returned from {@code func} when applied to the item emitted by the source Maybe + * @see ReactiveX operators documentation: FlatMap + */ + public final Maybe concatMap(Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeFlatten(this, mapper)); + } + /** * Registers an {@link Action} to be called when this Maybe invokes either * {@link MaybeObserver#onComplete onSuccess}, @@ -360,7 +773,7 @@ public final Maybe compose(Function, ? extends MaybeSour * * * @param onAfterTerminate - * an {@link Action} to be invoked when the source Publisher finishes + * an {@link Action} to be invoked when the source Maybe finishes * @return a Maybe that emits the same items as the source Maybe, then invokes the * {@link Action} * @see ReactiveX operators documentation: Do @@ -387,7 +800,7 @@ public final Maybe doAfterTerminate(Action onAfterTerminate) { * @param onDispose the runnable called when the subscription is cancelled (disposed) * @return the new Maybe instance */ - public final Maybe doOnDispose(final Action onDispose) { + public final Maybe doOnDispose(Action onDispose) { return RxJavaPlugins.onAssembly(new MaybePeek(this, Functions.emptyConsumer(), // onSubscribe Functions.emptyConsumer(), // onSuccess @@ -399,7 +812,7 @@ public final Maybe doOnDispose(final Action onDispose) { } /** - * Modifies the source Publisher so that it invokes an action when it calls {@code onCompleted}. + * Modifies the source Maybe so that it invokes an action when it calls {@code onCompleted}. *

* *

@@ -408,7 +821,7 @@ public final Maybe doOnDispose(final Action onDispose) { *
* * @param onComplete - * the action to invoke when the source Publisher calls {@code onCompleted} + * the action to invoke when the source Maybe calls {@code onCompleted} * @return the new Maybe with the side-effecting behavior applied * @see ReactiveX operators documentation: Do */ @@ -434,7 +847,7 @@ public final Maybe doOnComplete(Action onComplete) { * @param onError the consumer called with the success value of onError * @return the new Maybe instance */ - public final Maybe doOnError(final Consumer onError) { + public final Maybe doOnError(Consumer onError) { return RxJavaPlugins.onAssembly(new MaybePeek(this, Functions.emptyConsumer(), // onSubscribe Functions.emptyConsumer(), // onSuccess @@ -444,6 +857,25 @@ public final Maybe doOnError(final Consumer onError) { Functions.EMPTY_ACTION // dispose )); } + + /** + * Calls the given onEvent callback with the (success value, null) for an onSuccess, (null, throwabe) for + * an onError or (null, null) for an onComplete signal from this Maybe before delivering said + * signal to the downstream. + *

+ * Exceptions thrown from the callback will override the event so the downstream receives the + * error instead of the original signal. + *

+ *
Scheduler:
+ *
{@code doOnEvent} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onEvent the callback to call with the terminal event tuple + * @return the new Maybe instance + */ + public final Maybe doOnEvent(BiConsumer onEvent) { + ObjectHelper.requireNonNull(onEvent, "onEvent is null"); + return RxJavaPlugins.onAssembly(new MaybeDoOnEvent(this, onEvent)); + } /** * Calls the shared consumer with the Disposable sent through the onSubscribe for each @@ -455,7 +887,7 @@ public final Maybe doOnError(final Consumer onError) { * @param onSubscribe the consumer called with the Disposable sent via onSubscribe * @return the new Maybe instance */ - public final Maybe doOnSubscribe(final Consumer onSubscribe) { + public final Maybe doOnSubscribe(Consumer onSubscribe) { return RxJavaPlugins.onAssembly(new MaybePeek(this, ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"), Functions.emptyConsumer(), // onSuccess @@ -476,7 +908,7 @@ public final Maybe doOnSubscribe(final Consumer onSubscri * @param onSuccess the consumer called with the success value of onSuccess * @return the new Maybe instance */ - public final Maybe doOnSuccess(final Consumer onSuccess) { + public final Maybe doOnSuccess(Consumer onSuccess) { return RxJavaPlugins.onAssembly(new MaybePeek(this, Functions.emptyConsumer(), // onSubscribe ObjectHelper.requireNonNull(onSuccess, "onSubscribe is null"), @@ -511,7 +943,7 @@ public final Maybe filter(Predicate predicate) { } /** - * Returns a Maybe that is based on applying a specified function to the item emitted by the source Single, + * Returns a Maybe that is based on applying a specified function to the item emitted by the source Maybe, * where that function returns a MaybeSource. *

* @@ -519,6 +951,7 @@ public final Maybe filter(Predicate predicate) { *

Scheduler:
*
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
* + *

Note that flatMap and concatMap for Maybe is the same operation. * * @param the result value type * @param mapper @@ -528,7 +961,7 @@ public final Maybe filter(Predicate predicate) { */ public final Maybe flatMap(Function> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - return RxJavaPlugins.onAssembly(new MaybeFlatMap(this, mapper)); + return RxJavaPlugins.onAssembly(new MaybeFlatten(this, mapper)); } /** @@ -557,7 +990,7 @@ public final Maybe flatMap( Function> onSuccessMapper, Function> onErrorMapper, Callable> onCompleteSupplier) { - ObjectHelper.requireNonNull(onSuccessMapper, "onNextMapper is null"); + ObjectHelper.requireNonNull(onSuccessMapper, "onSuccessMapper is null"); ObjectHelper.requireNonNull(onErrorMapper, "onErrorMapper is null"); ObjectHelper.requireNonNull(onCompleteSupplier, "onCompleteSupplier is null"); return new MaybeFlatMapNotification(this, onSuccessMapper, onErrorMapper, onCompleteSupplier); @@ -573,7 +1006,7 @@ public final Maybe flatMap( * * * @return an empty Maybe that only calls {@code onComplete} or {@code onError}, based on which one is - * called by the source Publisher + * called by the source Maybe * @see ReactiveX operators documentation: IgnoreElements */ @SchedulerSupport(SchedulerSupport.NONE) @@ -728,6 +1161,103 @@ public final Single toSingle() { return RxJavaPlugins.onAssembly(new MaybeToSingle(this)); } + /** + * Subscribes to a Maybe and ignores {@code onSuccess} and {@code onComplete} emissions. + *

+ * If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler. + *

+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a {@link Disposable} reference with which the caller can stop receiving items before + * the Maybe has finished sending them + * @see ReactiveX operators documentation: Subscribe + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Disposable subscribe() { + return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + } + + /** + * Subscribes to a Maybe and provides a callback to handle the items it emits. + *

+ * If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler. + *

+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSuccess + * the {@code Consumer} you have designed to accept a success value from the Maybe + * @return a {@link Disposable} reference with which the caller can stop receiving items before + * the Maybe has finished sending them + * @throws NullPointerException + * if {@code onSuccess} is null + * @see ReactiveX operators documentation: Subscribe + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Disposable subscribe(Consumer onSuccess) { + return subscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + } + + /** + * Subscribes to a Maybe and provides callbacks to handle the items it emits and any error + * notification it issues. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSuccess + * the {@code Consumer} you have designed to accept a success value from the Maybe + * @param onError + * the {@code Consumer} you have designed to accept any error notification from the + * Maybe + * @return a {@link Subscription} reference with which the caller can stop receiving items before + * the Maybe has finished sending them + * @see ReactiveX operators documentation: Subscribe + * @throws IllegalArgumentException + * if {@code onSuccess} is null, or + * if {@code onError} is null + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Disposable subscribe(Consumer onSuccess, Consumer onError) { + return subscribe(onSuccess, onError, Functions.EMPTY_ACTION); + } + + /** + * Subscribes to a Maybe and provides callbacks to handle the items it emits and any error or + * completion notification it issues. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSuccess + * the {@code Consumer} you have designed to accept a success value from the Maybe + * @param onError + * the {@code Consumer} you have designed to accept any error notification from the + * Maybe + * @param onComplete + * the {@code Action} you have designed to accept a completion notification from the + * Maybe + * @return a {@link Disposable} reference with which the caller can stop receiving items before + * the Maybe has finished sending them + * @throws IllegalArgumentException + * if {@code onSuccess} is null, or + * if {@code onError} is null, or + * if {@code onComplete} is null + * @see ReactiveX operators documentation: Subscribe + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Disposable subscribe(Consumer onSuccess, Consumer onError, + Action onComplete) { + return subscribeWith(new MaybeCallbackObserver(onSuccess, onError, onComplete)); + } + + + @Override public final void subscribe(MaybeObserver observer) { ObjectHelper.requireNonNull(observer, "observer is null"); @@ -775,6 +1305,31 @@ public final Maybe subscribeOn(Scheduler scheduler) { return RxJavaPlugins.onAssembly(new MaybeSubscribeOn(this, scheduler)); } + /** + * Subscribes a given MaybeObserver (subclass) to this Maybe and returns the given + * MaybeObserver as is. + *

Usage example: + *


+     * Maybe source = Maybe.just(1);
+     * CompositeDisposable composite = new CompositeDisposable();
+     * 
+     * MaybeObserver<Integer> ms = new MaybeObserver<>() {
+     *     // ...
+     * };
+     * 
+     * composite.add(source.subscribeWith(ms));
+     * 
+ * @param the type of the MaybeObserver to use and return + * @param observer the MaybeObserver (subclass) to use and return, not null + * @return the input {@code subscriber} + * @throws NullPointerException if {@code subscriber} is null + */ + public final > E subscribeWith(E observer) { + subscribe(observer); + return observer; + } + + // ------------------------------------------------------------------ // Test helper // ------------------------------------------------------------------ diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index c04b62c89f..b437a6b322 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -106,7 +106,7 @@ public static Observable ambArray(ObservableSource... source int len = sources.length; if (len == 0) { return empty(); - } else + } if (len == 1) { return (Observable)wrap(sources[0]); } @@ -208,7 +208,7 @@ public static Observable combineLatest(Iterable combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the queue holds a pair of values so we need to double the capacity int s = bufferSize << 1; @@ -270,7 +270,7 @@ public static Observable combineLatest(ObservableSource[] Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the queue holds a pair of values so we need to double the capacity int s = bufferSize << 1; @@ -719,7 +719,7 @@ public static Observable combineLatestDelayError(Function Observable combineLatestDelayError(ObservableSource[] sources, Function combiner, int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(combiner, "combiner is null"); if (sources.length == 0) { return empty(); @@ -788,7 +788,7 @@ public static Observable combineLatestDelayError(Iterable combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the queue holds a pair of values so we need to double the capacity int s = bufferSize << 1; @@ -3006,7 +3006,7 @@ public static Observable sequenceEqual(ObservableSource(source1, source2, isEqual, bufferSize)); } @@ -3166,7 +3166,7 @@ public static Observable switchOnNextDelayError(ObservableSource Observable switchOnNextDelayError(ObservableSource> sources, int prefetch) { ObjectHelper.requireNonNull(sources, "sources is null"); - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ObservableSwitchMap(sources, Functions.identity(), prefetch, true)); } @@ -3302,36 +3302,6 @@ public static Observable using(Callable resourceSupplier, return RxJavaPlugins.onAssembly(new ObservableUsing(resourceSupplier, sourceSupplier, disposer, eager)); } - /** - * Validate that the given value is positive or report an IllegalArgumentException with - * the parameter name. - * @param value the value to validate - * @param paramName the parameter name of the value - * @return value - * @throws IllegalArgumentException if bufferSize <= 0 - */ - protected static int verifyPositive(int value, String paramName) { - if (value <= 0) { - throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); - } - return value; - } - - /** - * Validate that the given value is positive or report an IllegalArgumentException with - * the parameter name. - * @param value the value to validate - * @param paramName the parameter name of the value - * @return value - * @throws IllegalArgumentException if bufferSize <= 0 - */ - protected static long verifyPositive(long value, String paramName) { - if (value <= 0L) { - throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); - } - return value; - } - /** * Wraps an ObservableSource into an Observable if not already an Observable. * @@ -4121,7 +4091,7 @@ public static Observable zipArray(Function z return empty(); } ObjectHelper.requireNonNull(zipper, "zipper is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableZip(sources, null, zipper, bufferSize, delayError)); } @@ -4176,7 +4146,7 @@ public static Observable zipIterable(Iterable(null, sources, zipper, bufferSize, delayError)); } @@ -4369,7 +4339,7 @@ public final Iterable blockingIterable() { * @see ReactiveX documentation: To */ public final Iterable blockingIterable(int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return new BlockingObservableIterable(this, bufferSize); } @@ -4689,8 +4659,8 @@ public final Observable> buffer(int count, int skip) { */ @SchedulerSupport(SchedulerSupport.NONE) public final > Observable buffer(int count, int skip, Callable bufferSupplier) { - verifyPositive(count, "count"); - verifyPositive(skip, "skip"); + ObjectHelper.verifyPositive(count, "count"); + ObjectHelper.verifyPositive(skip, "skip"); ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); return RxJavaPlugins.onAssembly(new ObservableBuffer(this, count, skip, bufferSupplier)); } @@ -4949,7 +4919,7 @@ public final > Observable buffer( ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); - verifyPositive(count, "count"); + ObjectHelper.verifyPositive(count, "count"); return RxJavaPlugins.onAssembly(new ObservableBufferTimed(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize)); } @@ -5457,7 +5427,7 @@ public final Observable concatMap(Function Observable concatMap(Function> mapper, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -5512,7 +5482,7 @@ public final Observable concatMapDelayError(Function Observable concatMapDelayError(Function> mapper, int prefetch, boolean tillTheEnd) { - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -5569,8 +5539,8 @@ public final Observable concatMapEager(Function Observable concatMapEager(Function> mapper, int maxConcurrency, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - verifyPositive(maxConcurrency, "maxConcurrency"); - verifyPositive(prefetch, "prefetch"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ObservableConcatMapEager(this, mapper, ErrorMode.IMMEDIATE, maxConcurrency, prefetch)); } @@ -6753,8 +6723,8 @@ public final Observable flatMap(Function Observable flatMap(Function> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - verifyPositive(maxConcurrency, "maxConcurrency"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -7414,7 +7384,7 @@ public final Observable> groupBy(Function(this, keySelector, valueSelector, bufferSize, delayError)); } @@ -7779,7 +7749,7 @@ public final Observable observeOn(Scheduler scheduler, boolean delayError) { @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize)); } @@ -8059,7 +8029,7 @@ public final Observable publish(Function, ? extends */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable publish(Function, ? extends ObservableSource> selector, int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(selector, "selector is null"); return ObservablePublish.create(this, selector, bufferSize); } @@ -8083,7 +8053,7 @@ public final Observable publish(Function, ? extends */ @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableObservable publish(int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return ObservablePublish.create(this, bufferSize); } @@ -8448,7 +8418,7 @@ public final Observable replay(Function, ? extends */ @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable replay(Function, ? extends ObservableSource> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(selector, "selector is null"); return ObservableReplay.multicastSelector( ObservableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector); @@ -8660,7 +8630,7 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit */ @SchedulerSupport(SchedulerSupport.CUSTOM) public final ConnectableObservable replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return ObservableReplay.create(this, time, unit, scheduler, bufferSize); @@ -9521,7 +9491,7 @@ public final Observable skipLast(long time, TimeUnit unit, Scheduler schedule public final Observable skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the internal buffer holds pairs of (timestamp, value) so double the default buffer size int s = bufferSize << 1; return RxJavaPlugins.onAssembly(new ObservableSkipLastTimed(this, time, unit, scheduler, s, delayError)); @@ -9719,7 +9689,7 @@ public final Observable startWithArray(T... items) { *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
* * - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the ObservableSource has finished sending them * @see ReactiveX operators documentation: Subscribe */ @@ -9739,7 +9709,7 @@ public final Disposable subscribe() { * * @param onNext * the {@code Consumer} you have designed to accept emissions from the ObservableSource - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the ObservableSource has finished sending them * @throws NullPointerException * if {@code onNext} is null @@ -9763,7 +9733,7 @@ public final Disposable subscribe(Consumer onNext) { * @param onError * the {@code Consumer} you have designed to accept any error notification from the * ObservableSource - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * @return a {@link Disposable} reference with which the caller can stop receiving items before * the ObservableSource has finished sending them * @see ReactiveX operators documentation: Subscribe * @throws IllegalArgumentException @@ -9791,7 +9761,7 @@ public final Disposable subscribe(Consumer onNext, Consumer onNext, Consumer Observable switchMap(Function Observable switchMap(Function> mapper, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -10051,7 +10021,7 @@ public final Observable switchMapDelayError(Function Observable switchMapDelayError(Function> mapper, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -10284,7 +10254,7 @@ public final Observable takeLast(long count, long time, TimeUnit unit, Schedu public final Observable takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (count < 0) { throw new IndexOutOfBoundsException("count >= 0 required but it was " + count); } @@ -11215,7 +11185,7 @@ public final Observable> toList() { */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable> toList(final int capacityHint) { - verifyPositive(capacityHint, "capacityHint"); + ObjectHelper.verifyPositive(capacityHint, "capacityHint"); return RxJavaPlugins.onAssembly(new ObservableToList>(this, capacityHint)); } @@ -11721,9 +11691,9 @@ public final Observable> window(long count, long skip) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable> window(long count, long skip, int bufferSize) { - verifyPositive(count, "count"); - verifyPositive(skip, "skip"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(count, "count"); + ObjectHelper.verifyPositive(skip, "skip"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableWindow(this, count, skip, bufferSize)); } @@ -11811,9 +11781,9 @@ public final Observable> window(long timespan, long timeskip, Time */ @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) { - verifyPositive(timespan, "timespan"); - verifyPositive(timeskip, "timeskip"); - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(timespan, "timespan"); + ObjectHelper.verifyPositive(timeskip, "timeskip"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(unit, "unit is null"); return RxJavaPlugins.onAssembly(new ObservableWindowTimed(this, timespan, timeskip, unit, scheduler, Long.MAX_VALUE, bufferSize, false)); @@ -12041,10 +12011,10 @@ public final Observable> window(long timespan, TimeUnit unit, public final Observable> window( long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart, int bufferSize) { - verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.requireNonNull(unit, "unit is null"); - verifyPositive(count, "count"); + ObjectHelper.verifyPositive(count, "count"); return RxJavaPlugins.onAssembly(new ObservableWindowTimed(this, timespan, timespan, unit, scheduler, count, bufferSize, restart)); } diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index c1b3db2e34..6a78f10f7e 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -124,11 +124,29 @@ public static Flowable concat(Iterable Flowable concat(Publisher> sources) { - return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), 2, ErrorMode.IMMEDIATE)); + return concat(sources, 2); } - + + /** + * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by + * a Publisher sequence and prefetched by the specified amount. + *
+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the Publisher of SingleSource instances + * @param prefetch the number of SingleSources to prefetch from the Publisher + * @return the new Flowable instance + * @since 2.0 + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Flowable concat(Publisher> sources, int prefetch) { + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE)); + } + /** * Returns a Flowable that emits the items emitted by two Singles, one after the other. *

@@ -218,6 +236,23 @@ public static Flowable concat( return concat(Flowable.fromArray(source1, source2, source3, source4)); } + /** + * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in + * an array. + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources the array of SingleSource instances + * @return the new Flowable instance + * @since 2.0 + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Flowable concatArray(SingleSource... sources) { + return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY)); + } + /** * Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world. *

diff --git a/src/main/java/io/reactivex/disposables/Disposables.java b/src/main/java/io/reactivex/disposables/Disposables.java index e2bbe95703..5f2660f1bf 100644 --- a/src/main/java/io/reactivex/disposables/Disposables.java +++ b/src/main/java/io/reactivex/disposables/Disposables.java @@ -32,18 +32,6 @@ private Disposables() { throw new IllegalStateException("No instances!"); } - /** - * Construct a Disposable by wrapping a Runnable that is - * executed exactly once when the Disposable is disposed. - * @param run the Runnable to wrap - * @return the new Disposable instance - * @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity - */ - @Deprecated - public static Disposable from(Runnable run) { - return fromRunnable(run); - } - /** * Construct a Disposable by wrapping a Runnable that is * executed exactly once when the Disposable is disposed. @@ -55,18 +43,6 @@ public static Disposable fromRunnable(Runnable run) { return new RunnableDisposable(run); } - /** - * Construct a Disposable by wrapping a Action that is - * executed exactly once when the Disposable is disposed. - * @param run the Action to wrap - * @return the new Disposable instance - * @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity - */ - @Deprecated - public static Disposable from(Action run) { - return fromAction(run); - } - /** * Construct a Disposable by wrapping a Action that is * executed exactly once when the Disposable is disposed. @@ -78,31 +54,6 @@ public static Disposable fromAction(Action run) { return new ActionDisposable(run); } - /** - * Construct a Disposable by wrapping a Future that is - * cancelled exactly once when the Disposable is disposed. - * @param future the Future to wrap - * @return the new Disposable instance - * @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity - */ - @Deprecated - public static Disposable from(Future future) { - return fromFuture(future, true); - } - - /** - * Construct a Disposable by wrapping a Runnable that is - * executed exactly once when the Disposable is disposed. - * @param future the Runnable to wrap - * @param allowInterrupt if true, the future cancel happens via Future.cancel(true) - * @return the new Disposable instance - * @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity - */ - @Deprecated - public static Disposable from(Future future, boolean allowInterrupt) { - return fromFuture(future, allowInterrupt); - } - /** * Construct a Disposable by wrapping a Future that is * cancelled exactly once when the Disposable is disposed. @@ -126,18 +77,6 @@ public static Disposable fromFuture(Future future, boolean allowInterrupt) { return new FutureDisposable(future, allowInterrupt); } - /** - * Construct a Disposable by wrapping a Subscription that is - * cancelled exactly once when the Disposable is disposed. - * @param subscription the Runnable to wrap - * @return the new Disposable instance - * @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity - */ - @Deprecated - public static Disposable from(Subscription subscription) { - return fromSubscription(subscription); - } - /** * Construct a Disposable by wrapping a Subscription that is * cancelled exactly once when the Disposable is disposed. diff --git a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java index c094e5acb8..0a79f8f8bd 100644 --- a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java +++ b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java @@ -96,4 +96,35 @@ public boolean test(Object o1, Object o2) { public static BiPredicate equalsPredicate() { return (BiPredicate)EQUALS; } + + /** + * Validate that the given value is positive or report an IllegalArgumentException with + * the parameter name. + * @param value the value to validate + * @param paramName the parameter name of the value + * @return value + * @throws IllegalArgumentException if bufferSize <= 0 + */ + public static int verifyPositive(int value, String paramName) { + if (value <= 0) { + throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); + } + return value; + } + + /** + * Validate that the given value is positive or report an IllegalArgumentException with + * the parameter name. + * @param value the value to validate + * @param paramName the parameter name of the value + * @return value + * @throws IllegalArgumentException if bufferSize <= 0 + */ + public static long verifyPositive(long value, String paramName) { + if (value <= 0L) { + throw new IllegalArgumentException(paramName + " > 0 required but it was " + value); + } + return value; + } + } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java new file mode 100644 index 0000000000..74856a91c3 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java @@ -0,0 +1,118 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Signals the event of the first MaybeSource that signals. + * + * @param the value type emitted + */ +public final class MaybeAmbArray extends Maybe { + + final MaybeSource[] sources; + + public MaybeAmbArray(MaybeSource[] sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + + AmbMaybeObserver parent = new AmbMaybeObserver(observer); + observer.onSubscribe(parent); + + for (MaybeSource s : sources) { + if (parent.isDisposed()) { + return; + } + + if (s == null) { + parent.onError(new NullPointerException("One of the MaybeSources is null")); + return; + } + + s.subscribe(parent); + } + } + + static final class AmbMaybeObserver + extends AtomicBoolean + implements MaybeObserver, Disposable { + + /** */ + private static final long serialVersionUID = -7044685185359438206L; + + final MaybeObserver actual; + + final CompositeDisposable set; + + public AmbMaybeObserver(MaybeObserver actual) { + this.actual = actual; + this.set = new CompositeDisposable(); + } + + @Override + public void dispose() { + if (compareAndSet(false, true)) { + set.dispose(); + } + } + + @Override + public boolean isDisposed() { + return get(); + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (compareAndSet(false, true)) { + set.dispose(); + + actual.onSuccess(value); + } + } + + @Override + public void onError(Throwable e) { + if (compareAndSet(false, true)) { + set.dispose(); + + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (compareAndSet(false, true)) { + set.dispose(); + + actual.onComplete(); + } + } + + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java new file mode 100644 index 0000000000..0e787c30e5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java @@ -0,0 +1,56 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.internal.operators.maybe.MaybeAmbArray.AmbMaybeObserver; + +/** + * Signals the event of the first MaybeSource that signals. + * + * @param the value type emitted + */ +public final class MaybeAmbIterable extends Maybe { + + final Iterable> sources; + + public MaybeAmbIterable(Iterable> sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + AmbMaybeObserver parent = new AmbMaybeObserver(observer); + observer.onSubscribe(parent); + + int i = 0; + for (MaybeSource s : sources) { + if (parent.isDisposed()) { + return; + } + + if (s == null) { + parent.onError(new NullPointerException("One of the MaybeSources is null")); + return; + } + + s.subscribe(parent); + i++; + } + + if (i == 0) { + observer.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java new file mode 100644 index 0000000000..dff462445a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.MaybeObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * MaybeObserver that delegates the onSuccess, onError and onComplete method calls to callbacks. + * + * @param the value type + */ +public final class MaybeCallbackObserver +extends AtomicReference +implements MaybeObserver, Disposable { + + /** */ + private static final long serialVersionUID = -6076952298809384986L; + + final Consumer onSuccess; + + final Consumer onError; + + final Action onComplete; + + public MaybeCallbackObserver(Consumer onSuccess, Consumer onError, + Action onComplete) { + super(); + this.onSuccess = onSuccess; + this.onError = onError; + this.onComplete = onComplete; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(T value) { + lazySet(DisposableHelper.DISPOSED); + try { + onSuccess.accept(value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + @Override + public void onError(Throwable e) { + lazySet(DisposableHelper.DISPOSED); + try { + onError.accept(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(new CompositeException(e, ex)); + } + } + + @Override + public void onComplete() { + lazySet(DisposableHelper.DISPOSED); + try { + onComplete.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java new file mode 100644 index 0000000000..423ebd7d38 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java @@ -0,0 +1,162 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; + +/** + * Concatenate values of each MaybeSource provided in an array. + * + * @param the value type + */ +public final class MaybeConcatArray extends Flowable { + + final MaybeSource[] sources; + + public MaybeConcatArray(MaybeSource[] sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(Subscriber s) { + ConcatMaybeObserver parent = new ConcatMaybeObserver(s, sources); + s.onSubscribe(parent); + parent.drain(); + } + + static final class ConcatMaybeObserver + extends AtomicInteger + implements MaybeObserver, Subscription { + /** */ + private static final long serialVersionUID = 3520831347801429610L; + + final Subscriber actual; + + final AtomicLong requested; + + final AtomicReference current; + + final SequentialDisposable disposables; + + final MaybeSource[] sources; + + int index; + + long produced; + + public ConcatMaybeObserver(Subscriber actual, MaybeSource[] sources) { + this.actual = actual; + this.sources = sources; + this.requested = new AtomicLong(); + this.disposables = new SequentialDisposable(); + this.current = new AtomicReference(NotificationLite.COMPLETE); // as if a previous completed + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + disposables.dispose(); + } + + @Override + public void onSubscribe(Disposable d) { + disposables.replace(d); + } + + @Override + public void onSuccess(T value) { + current.lazySet(value); + drain(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + current.lazySet(NotificationLite.COMPLETE); + drain(); + } + + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + + AtomicReference c = current; + Subscriber a = actual; + + for (;;) { + if (disposables.isDisposed()) { + c.lazySet(null); + return; + } + + Object o = c.get(); + + if (o != null) { + boolean goNextSource; + if (o != NotificationLite.COMPLETE) { + long p = produced; + if (p != requested.get()) { + produced = p + 1; + c.lazySet(null); + goNextSource = true; + + a.onNext((T)o); + } else { + goNextSource = false; + } + } else { + goNextSource = true; + c.lazySet(null); + } + + if (goNextSource) { + int i = index; + if (i == sources.length) { + a.onComplete(); + return; + } + index = i + 1; + + sources[i].subscribe(this); + } + } + + if (decrementAndGet() == 0) { + break; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java new file mode 100644 index 0000000000..ef045da3dd --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java @@ -0,0 +1,191 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.Iterator; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.*; + +/** + * Concatenate values of each MaybeSource provided by an Iterable. + * + * @param the value type + */ +public final class MaybeConcatIterable extends Flowable { + + final Iterable> sources; + + public MaybeConcatIterable(Iterable> sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(Subscriber s) { + + Iterator> it; + + try { + it = ObjectHelper.requireNonNull(sources.iterator(), "The sources Iterable returned a null Iterator"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptySubscription.error(ex, s); + return; + } + + ConcatMaybeObserver parent = new ConcatMaybeObserver(s, it); + s.onSubscribe(parent); + parent.drain(); + } + + static final class ConcatMaybeObserver + extends AtomicInteger + implements MaybeObserver, Subscription { + /** */ + private static final long serialVersionUID = 3520831347801429610L; + + final Subscriber actual; + + final AtomicLong requested; + + final AtomicReference current; + + final SequentialDisposable disposables; + + final Iterator> sources; + + long produced; + + public ConcatMaybeObserver(Subscriber actual, Iterator> sources) { + this.actual = actual; + this.sources = sources; + this.requested = new AtomicLong(); + this.disposables = new SequentialDisposable(); + this.current = new AtomicReference(NotificationLite.COMPLETE); // as if a previous completed + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + disposables.dispose(); + } + + @Override + public void onSubscribe(Disposable d) { + disposables.replace(d); + } + + @Override + public void onSuccess(T value) { + current.lazySet(value); + drain(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + current.lazySet(NotificationLite.COMPLETE); + drain(); + } + + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + + AtomicReference c = current; + Subscriber a = actual; + + for (;;) { + if (disposables.isDisposed()) { + c.lazySet(null); + return; + } + + Object o = c.get(); + + if (o != null) { + boolean goNextSource; + if (o != NotificationLite.COMPLETE) { + long p = produced; + if (p != requested.get()) { + produced = p + 1; + c.lazySet(null); + goNextSource = true; + + a.onNext((T)o); + } else { + goNextSource = false; + } + } else { + goNextSource = true; + c.lazySet(null); + } + + if (goNextSource) { + boolean b; + + try { + b = sources.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (b) { + MaybeSource source; + + try { + source = ObjectHelper.requireNonNull(sources.next(), "The source Iterator returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + source.subscribe(this); + } else { + a.onComplete(); + } + } + } + + if (decrementAndGet() == 0) { + break; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java new file mode 100644 index 0000000000..6e011b0425 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java @@ -0,0 +1,116 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.BiConsumer; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * Calls a BiConsumer with the success, error values of the upstream Maybe or with two nulls if + * the Maybe completed. + * + * @param the value type + */ +public final class MaybeDoOnEvent extends AbstractMaybeWithUpstream { + + final BiConsumer onEvent; + + public MaybeDoOnEvent(MaybeSource source, BiConsumer onEvent) { + super(source); + this.onEvent = onEvent; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DoOnEventMaybeObserver(observer, onEvent)); + } + + static final class DoOnEventMaybeObserver implements MaybeObserver, Disposable { + final MaybeObserver actual; + + final BiConsumer onEvent; + + Disposable d; + + public DoOnEventMaybeObserver(MaybeObserver actual, BiConsumer onEvent) { + this.actual = actual; + this.onEvent = onEvent; + } + + @Override + public void dispose() { + d.dispose(); + d = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + d = DisposableHelper.DISPOSED; + + try { + onEvent.accept(value, null); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + + try { + onEvent.accept(null, e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); + } + + actual.onError(e); + } + + @Override + public void onComplete() { + d = DisposableHelper.DISPOSED; + + try { + onEvent.accept(null, null); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + actual.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatten.java similarity index 96% rename from src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java rename to src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatten.java index 0da224e6ea..75ee268747 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatten.java @@ -28,11 +28,11 @@ * @param the source value type * @param the result value type */ -public final class MaybeFlatMap extends AbstractMaybeWithUpstream { +public final class MaybeFlatten extends AbstractMaybeWithUpstream { final Function> mapper; - public MaybeFlatMap(MaybeSource source, Function> mapper) { + public MaybeFlatten(MaybeSource source, Function> mapper) { super(source); this.mapper = mapper; } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java new file mode 100644 index 0000000000..194604f45e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java @@ -0,0 +1,451 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Run all MaybeSources of an array at once and signal their values as they become available. + * + * @param the value type + */ +public final class MaybeMergeArray extends Flowable { + + final MaybeSource[] sources; + + public MaybeMergeArray(MaybeSource[] sources) { + this.sources = sources; + } + + @Override + protected void subscribeActual(Subscriber s) { + MaybeSource[] maybes = sources; + int n = maybes.length; + + SimpleQueueWithConsumerIndex queue; + + if (n <= bufferSize()) { + queue = new MpscFillOnceSimpleQueue(n); + } else { + queue = new ClqSimpleQueue(); + } + MergeMaybeObserver parent = new MergeMaybeObserver(s, n, queue); + + s.onSubscribe(parent); + + AtomicThrowable e = parent.error; + + for (MaybeSource source : maybes) { + if (parent.isCancelled() || e.get() != null) { + return; + } + + source.subscribe(parent); + } + } + + static final class MergeMaybeObserver + extends BasicIntQueueSubscription implements MaybeObserver { + + /** */ + private static final long serialVersionUID = -660395290758764731L; + + final Subscriber actual; + + final CompositeDisposable set; + + final AtomicLong requested; + + final SimpleQueueWithConsumerIndex queue; + + final AtomicThrowable error; + + final int sourceCount; + + volatile boolean cancelled; + + boolean outputFused; + + long consumed; + + public MergeMaybeObserver(Subscriber actual, int sourceCount, SimpleQueueWithConsumerIndex queue) { + this.actual = actual; + this.sourceCount = sourceCount; + this.set = new CompositeDisposable(); + this.requested = new AtomicLong(); + this.error = new AtomicThrowable(); + this.queue = queue; + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @SuppressWarnings("unchecked") + @Override + public T poll() throws Exception { + for (;;) { + Object o = queue.poll(); + if (o != NotificationLite.COMPLETE) { + return (T)o; + } + } + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + set.dispose(); + if (getAndIncrement() == 0) { + queue.clear(); + } + } + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + queue.offer(value); + drain(); + } + + @Override + public void onError(Throwable e) { + if (error.addThrowable(e)) { + set.dispose(); + queue.offer(NotificationLite.COMPLETE); + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + queue.offer(NotificationLite.COMPLETE); + drain(); + } + + boolean isCancelled() { + return cancelled; + } + + @SuppressWarnings("unchecked") + void drainNormal() { + int missed = 1; + Subscriber a = actual; + SimpleQueueWithConsumerIndex q = queue; + long e = consumed; + + for (;;) { + + long r = requested.get(); + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + + Throwable ex = error.get(); + if (ex != null) { + q.clear(); + a.onError(error.terminate()); + return; + } + + if (q.consumerIndex() == sourceCount) { + a.onComplete(); + return; + } + + Object v = q.poll(); + + if (v == null) { + break; + } + + if (v != NotificationLite.COMPLETE) { + a.onNext((T)v); + + e++; + } + } + + if (e == r) { + Throwable ex = error.get(); + if (ex != null) { + q.clear(); + a.onError(error.terminate()); + return; + } + + while (q.peek() == NotificationLite.COMPLETE) { + q.drop(); + } + + if (q.consumerIndex() == sourceCount) { + a.onComplete(); + return; + } + } + + consumed = e; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + + } + + void drainFused() { + int missed = 1; + Subscriber a = actual; + SimpleQueueWithConsumerIndex q = queue; + + for (;;) { + if (cancelled) { + q.clear(); + return; + } + Throwable ex = error.get(); + if (ex != null) { + q.clear(); + a.onError(ex); + return; + } + + boolean d = q.producerIndex() == sourceCount; + + if (!q.isEmpty()) { + a.onNext(null); + } + + if (d) { + a.onComplete(); + return; + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + if (outputFused) { + drainFused(); + } else { + drainNormal(); + } + } + } + + interface SimpleQueueWithConsumerIndex extends SimpleQueue { + + @Override + T poll(); + + T peek(); + + void drop(); + + int consumerIndex(); + + int producerIndex(); + } + + static final class MpscFillOnceSimpleQueue + extends AtomicReferenceArray + implements SimpleQueueWithConsumerIndex { + + /** */ + private static final long serialVersionUID = -7969063454040569579L; + final AtomicInteger producerIndex; + + int consumerIndex; + + public MpscFillOnceSimpleQueue(int length) { + super(length); + this.producerIndex = new AtomicInteger(); + } + + @Override + public boolean offer(T value) { + ObjectHelper.requireNonNull(value, "value is null"); + int idx = producerIndex.getAndIncrement(); + if (idx < length()) { + lazySet(idx, value); + return true; + } + return false; + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException(); + } + + @Override + public T poll() { + int ci = consumerIndex; + if (ci == length()) { + return null; + } + AtomicInteger pi = producerIndex; + for (;;) { + T v = get(ci); + if (v != null) { + consumerIndex = ci + 1; + lazySet(ci, null); + return v; + } + if (pi.get() == ci) { + return null; + } + } + } + + @Override + public T peek() { + int ci = consumerIndex; + if (ci == length()) { + return null; + } + return get(ci); + } + + @Override + public void drop() { + int ci = consumerIndex; + lazySet(ci, null); + consumerIndex = ci + 1; + } + + @Override + public boolean isEmpty() { + return consumerIndex == producerIndex(); + } + + @Override + public void clear() { + while (poll() != null && !isEmpty()); + } + + @Override + public int consumerIndex() { + return consumerIndex; + } + + @Override + public int producerIndex() { + return producerIndex.get(); + } + } + + static final class ClqSimpleQueue extends ConcurrentLinkedQueue implements SimpleQueueWithConsumerIndex { + + /** */ + private static final long serialVersionUID = -4025173261791142821L; + + int consumerIndex; + + final AtomicInteger producerIndex; + + public ClqSimpleQueue() { + this.producerIndex = new AtomicInteger(); + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(T e) { + producerIndex.getAndIncrement(); + return super.offer(e); + } + + @Override + public T poll() { + T v = super.poll(); + if (v != null) { + consumerIndex++; + } + return v; + } + + @Override + public int consumerIndex() { + return consumerIndex; + } + + @Override + public int producerIndex() { + return producerIndex.get(); + } + + @Override + public void drop() { + poll(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeToPublisher.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToPublisher.java new file mode 100644 index 0000000000..f00b7e7b48 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToPublisher.java @@ -0,0 +1,36 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.reactivestreams.Publisher; + +import io.reactivex.MaybeSource; +import io.reactivex.functions.Function; + +/** + * Helper function to merge/concat values of each MaybeSource provided by a Publisher. + */ +public enum MaybeToPublisher implements Function, Publisher> { + INSTANCE; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Function, Publisher> instance() { + return (Function)INSTANCE; + } + + @Override + public Publisher apply(MaybeSource t) throws Exception { + return new MaybeToFlowable(t); + } +} diff --git a/src/main/java/io/reactivex/observers/TestObserver.java b/src/main/java/io/reactivex/observers/TestObserver.java index 6c2406996b..9930f071c3 100644 --- a/src/main/java/io/reactivex/observers/TestObserver.java +++ b/src/main/java/io/reactivex/observers/TestObserver.java @@ -21,7 +21,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.CompositeException; import io.reactivex.functions.Consumer; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.util.ExceptionHelper; @@ -885,6 +885,17 @@ public final TestObserver awaitDone(long time, TimeUnit unit) { return this; } + + /** + * Assert that the TestObserver has received a Disposable but no other events. + * @return this + */ + public final TestObserver assertEmpty() { + return assertSubscribed() + .assertNoValues() + .assertNoErrors() + .assertNotComplete(); + } /** * An observer that ignores all events and does not report errors. diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 8087d63b81..94da766f5e 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -541,7 +541,7 @@ static final class UnboundedReplayBuffer volatile int size; public UnboundedReplayBuffer(int capacityHint) { - this.buffer = new ArrayList(verifyPositive(capacityHint, "capacityHint")); + this.buffer = new ArrayList(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); } @Override @@ -752,7 +752,7 @@ static final class SizeBoundReplayBuffer volatile boolean done; public SizeBoundReplayBuffer(int maxSize) { - this.maxSize = verifyPositive(maxSize, "maxSize"); + this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); Node h = new Node(null); this.tail = h; this.head = h; @@ -973,8 +973,8 @@ static final class SizeAndTimeBoundReplayBuffer public SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) { - this.maxSize = verifyPositive(maxSize, "maxSize"); - this.maxAge = verifyPositive(maxAge, "maxAge"); + this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); + this.maxAge = ObjectHelper.verifyPositive(maxAge, "maxAge"); this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); this.scheduler = ObjectHelper.requireNonNull(scheduler, "scheduler is null"); TimedNode h = new TimedNode(null, 0L); diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index a19f34e06d..815188eb88 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -101,7 +101,7 @@ public static UnicastProcessor create(int capacityHint, Runnable onCancel * @since 2.0 */ UnicastProcessor(int capacityHint) { - this.queue = new SpscLinkedArrayQueue(verifyPositive(capacityHint, "capacityHint")); + this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(); this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); @@ -117,7 +117,7 @@ public static UnicastProcessor create(int capacityHint, Runnable onCancel * @since 2.0 */ UnicastProcessor(int capacityHint, Runnable onTerminate) { - this.queue = new SpscLinkedArrayQueue(verifyPositive(capacityHint, "capacityHint")); + this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 34aa092594..4ac207d9e4 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -521,7 +521,7 @@ static final class UnboundedReplayBuffer volatile int size; public UnboundedReplayBuffer(int capacityHint) { - this.buffer = new ArrayList(verifyPositive(capacityHint, "capacityHint")); + this.buffer = new ArrayList(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); } @Override @@ -717,7 +717,7 @@ static final class SizeBoundReplayBuffer volatile boolean done; public SizeBoundReplayBuffer(int maxSize) { - this.maxSize = verifyPositive(maxSize, "maxSize"); + this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); Node h = new Node(null); this.tail = h; this.head = h; @@ -920,8 +920,8 @@ static final class SizeAndTimeBoundReplayBuffer public SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) { - this.maxSize = verifyPositive(maxSize, "maxSize"); - this.maxAge = verifyPositive(maxAge, "maxAge"); + this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); + this.maxAge = ObjectHelper.verifyPositive(maxAge, "maxAge"); this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); this.scheduler = ObjectHelper.requireNonNull(scheduler, "scheduler is null"); TimedNode h = new TimedNode(null, 0L); diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 9557bfcbb8..ffc4597307 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -115,7 +115,7 @@ public static UnicastSubject create(int capacityHint, Runnable onCancelle * @since 2.0 */ UnicastSubject(int capacityHint) { - this.queue = new SpscLinkedArrayQueue(verifyPositive(capacityHint, "capacityHint")); + this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(); this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); @@ -130,7 +130,7 @@ public static UnicastSubject create(int capacityHint, Runnable onCancelle * @since 2.0 */ UnicastSubject(int capacityHint, Runnable onTerminate) { - this.queue = new SpscLinkedArrayQueue(verifyPositive(capacityHint, "capacityHint")); + this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); diff --git a/src/main/java/io/reactivex/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/subscribers/TestSubscriber.java index 1485c2abd3..c2f8676a5f 100644 --- a/src/main/java/io/reactivex/subscribers/TestSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/TestSubscriber.java @@ -973,6 +973,17 @@ public final TestSubscriber awaitDone(long time, TimeUnit unit) { } return this; } + + /** + * Assert that the TestSubscriber has received a Subscription but no other events. + * @return this + */ + public final TestSubscriber assertEmpty() { + return assertSubscribed() + .assertNoValues() + .assertNoErrors() + .assertNotComplete(); + } /** * A subscriber that ignores all events and does not report errors. diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 34800591e9..6c020683bb 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -194,6 +194,11 @@ public static > void assertEmptyEnum(Class e) { } } + /** + * Assert that by consuming the Publisher with a bad request amount, it is + * reported to the plugin error handler promptly. + * @param source the source to consume + */ public static void assertBadRequestReported(Publisher source) { List list = trackPluginErrors(); try { @@ -309,6 +314,12 @@ public void run() { } } + /** + * Cast the given Throwable to CompositeException and returns its inner + * Throwable list. + * @param ex the target Throwable + * @return the list of Throwables + */ public static List compositeList(Throwable ex) { return ((CompositeException)ex).getExceptions(); } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java deleted file mode 100644 index 454cdd0d4b..0000000000 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.maybe; - -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import io.reactivex.*; -import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; - -public class MaybeCreateTest { - - @Test(expected = NullPointerException.class) - public void nullArgument() { - Maybe.create(null); - } - - @Test - public void basic() { - final Disposable d = Disposables.empty(); - - Maybe.create(new MaybeOnSubscribe() { - @Override - public void subscribe(MaybeEmitter e) throws Exception { - e.setDisposable(d); - - e.onSuccess(1); - e.onError(new TestException()); - e.onSuccess(2); - e.onError(new TestException()); - e.onComplete(); - } - }) - .test() - .assertResult(1); - - assertTrue(d.isDisposed()); - } - - @Test - public void basicWithError() { - final Disposable d = Disposables.empty(); - - Maybe.create(new MaybeOnSubscribe() { - @Override - public void subscribe(MaybeEmitter e) throws Exception { - e.setDisposable(d); - - e.onError(new TestException()); - e.onSuccess(2); - e.onError(new TestException()); - e.onComplete(); - } - }) - .test() - .assertFailure(TestException.class); - - assertTrue(d.isDisposed()); - } - - @Test - public void basicWithComplete() { - final Disposable d = Disposables.empty(); - - Maybe.create(new MaybeOnSubscribe() { - @Override - public void subscribe(MaybeEmitter e) throws Exception { - e.setDisposable(d); - - e.onComplete(); - e.onSuccess(1); - e.onError(new TestException()); - e.onComplete(); - e.onSuccess(2); - e.onError(new TestException()); - } - }) - .test() - .assertResult(); - - assertTrue(d.isDisposed()); - } - - @Test(expected = IllegalArgumentException.class) - public void unsafeCreate() { - Maybe.unsafeCreate(Maybe.just(1)); - } - - @Test(expected = NullPointerException.class) - public void unsafeCreateNull() { - Maybe.unsafeCreate(null); - } -} diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index ae96c17c1f..c74ce949e4 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -24,14 +24,16 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.operators.maybe.MaybeToPublisher; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; -import io.reactivex.subscribers.TestSubscriber; +import io.reactivex.subscribers.*; public class MaybeTest { @Test @@ -994,6 +996,18 @@ public MaybeSource apply(Integer v) throws Exception { .assertResult(10); } + @Test + public void concatMap() { + Maybe.just(1).concatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v * 10); + } + }) + .test() + .assertResult(10); + } + @Test public void flatMapEmpty() { Maybe.just(1).flatMap(new Function>() { @@ -1183,5 +1197,1083 @@ public void errorToCompletable() { .test() .assertFailure(TestException.class); } + + @Test + public void concat2() { + Maybe.concat(Maybe.just(1), Maybe.just(2)) + .test() + .assertResult(1, 2); + } + + @Test + public void concat2Empty() { + Maybe.concat(Maybe.empty(), Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void concat2Backpressured() { + TestSubscriber ts = Maybe.concat(Maybe.just(1), Maybe.just(2)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 2); + } + + @Test + public void concat2BackpressuredNonEager() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.concat(pp1.toMaybe(), pp2.toMaybe()) + .test(0L); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertEmpty(); + + ts.request(1); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + ts.assertValue(1); + + assertFalse(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + ts.request(1); + + ts.assertValue(1); + + pp2.onNext(2); + pp2.onComplete(); + + ts.assertResult(1, 2); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + } + + @Test + public void concat3() { + Maybe.concat(Maybe.just(1), Maybe.just(2), Maybe.just(3)) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void concat3Empty() { + Maybe.concat(Maybe.empty(), Maybe.empty(), Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void concat3Mixed1() { + Maybe.concat(Maybe.just(1), Maybe.empty(), Maybe.just(3)) + .test() + .assertResult(1, 3); + } + + @Test + public void concat3Mixed2() { + Maybe.concat(Maybe.just(1), Maybe.just(2), Maybe.empty()) + .test() + .assertResult(1, 2); + } + + @Test + public void concat3Backpressured() { + TestSubscriber ts = Maybe.concat(Maybe.just(1), Maybe.just(2), Maybe.just(3)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(2); + + ts.assertResult(1, 2, 3); + } + + @SuppressWarnings("unchecked") + @Test + public void concatArrayZero() { + assertSame(Flowable.empty(), Maybe.concatArray()); + } + + @SuppressWarnings("unchecked") + @Test + public void concatArrayOne() { + Maybe.concatArray(Maybe.just(1)).test().assertResult(1); + } + + @Test + public void concat4() { + Maybe.concat(Maybe.just(1), Maybe.just(2), Maybe.just(3), Maybe.just(4)) + .test() + .assertResult(1, 2, 3, 4); + } + + @SuppressWarnings("unchecked") + @Test + public void concatIterable() { + Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2))) + .test() + .assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void concatIterableEmpty() { + Maybe.concat(Arrays.asList(Maybe.empty(), Maybe.empty())) + .test() + .assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void concatIterableBackpressured() { + TestSubscriber ts = Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2))) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void concatIterableBackpressuredNonEager() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.concat(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(0L); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertEmpty(); + + ts.request(1); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + ts.assertValue(1); + + assertFalse(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + ts.request(1); + + ts.assertValue(1); + + pp2.onNext(2); + pp2.onComplete(); + + ts.assertResult(1, 2); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + } + + @Test + public void concatIterableZero() { + Maybe.concat(Collections.>emptyList()).test().assertResult(); + } + + @Test + public void concatIterableOne() { + Maybe.concat(Collections.>singleton(Maybe.just(1))).test().assertResult(1); + } + + @Test + public void concatPublisher() { + Maybe.concat(Flowable.just(Maybe.just(1), Maybe.just(2))).test().assertResult(1, 2); + } + + @Test + public void concatPublisherPrefetch() { + Maybe.concat(Flowable.just(Maybe.just(1), Maybe.just(2)), 1).test().assertResult(1, 2); + } + + @Test(expected = NullPointerException.class) + public void nullArgument() { + Maybe.create(null); + } + + @Test + public void basic() { + final Disposable d = Disposables.empty(); + + Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter e) throws Exception { + e.setDisposable(d); + + e.onSuccess(1); + e.onError(new TestException()); + e.onSuccess(2); + e.onError(new TestException()); + e.onComplete(); + } + }) + .test() + .assertResult(1); + + assertTrue(d.isDisposed()); + } + + @Test + public void basicWithError() { + final Disposable d = Disposables.empty(); + + Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter e) throws Exception { + e.setDisposable(d); + + e.onError(new TestException()); + e.onSuccess(2); + e.onError(new TestException()); + e.onComplete(); + } + }) + .test() + .assertFailure(TestException.class); + + assertTrue(d.isDisposed()); + } + + @Test + public void basicWithComplete() { + final Disposable d = Disposables.empty(); + + Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter e) throws Exception { + e.setDisposable(d); + + e.onComplete(); + e.onSuccess(1); + e.onError(new TestException()); + e.onComplete(); + e.onSuccess(2); + e.onError(new TestException()); + } + }) + .test() + .assertResult(); + + assertTrue(d.isDisposed()); + } + + @Test(expected = IllegalArgumentException.class) + public void unsafeCreateWithMaybe() { + Maybe.unsafeCreate(Maybe.just(1)); + } + + @Test + public void maybeToPublisherEnum() { + TestHelper.checkEnum(MaybeToPublisher.class); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayEmpty() { + assertSame(Maybe.empty(), Maybe.ambArray()); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayOne() { + assertSame(Maybe.never(), Maybe.ambArray(Maybe.never())); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray1SignalsSuccess() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray2SignalsSuccess() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onNext(2); + pp2.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(2); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray1SignalsError() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onError(new TestException()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray2SignalsError() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onError(new TestException()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray1SignalsComplete() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArray2SignalsComplete() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.ambArray(pp1.toMaybe(), pp2.toMaybe()) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterable1SignalsSuccess() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterable2SignalsSuccess() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onNext(2); + pp2.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(2); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterable1SignalsError() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onError(new TestException()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + @SuppressWarnings("unchecked") + @Test + public void ambIterable2SignalsError() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onError(new TestException()); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterable1SignalsComplete() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterable2SignalsComplete() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.amb(Arrays.asList(pp1.toMaybe(), pp2.toMaybe())) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(); + } + + @Test + public void ambIterableEmpty() { + Maybe.amb(Collections.>emptyList()).test().assertResult(); + } + + @Test + public void ambIterableOne() { + Maybe.amb(Collections.singleton(Maybe.just(1))).test().assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArray() { + Maybe.mergeArray(Maybe.just(1), Maybe.just(2), Maybe.just(3)) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void merge2() { + Maybe.merge(Maybe.just(1), Maybe.just(2)) + .test() + .assertResult(1, 2); + } + + @Test + public void merge3() { + Maybe.merge(Maybe.just(1), Maybe.just(2), Maybe.just(3)) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void merge4() { + Maybe.merge(Maybe.just(1), Maybe.just(2), Maybe.just(3), Maybe.just(4)) + .test() + .assertResult(1, 2, 3, 4); + } + + @Test + public void merge4Take2() { + Maybe.merge(Maybe.just(1), Maybe.just(2), Maybe.just(3), Maybe.just(4)) + .take(2) + .test() + .assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayBackpressured() { + TestSubscriber ts = Maybe.mergeArray(Maybe.just(1), Maybe.just(2), Maybe.just(3)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertValues(1, 2); + + ts.request(1); + ts.assertResult(1, 2, 3); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayBackpressuredMixed1() { + TestSubscriber ts = Maybe.mergeArray(Maybe.just(1), Maybe.empty(), Maybe.just(3)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 3); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayBackpressuredMixed2() { + TestSubscriber ts = Maybe.mergeArray(Maybe.just(1), Maybe.just(2), Maybe.empty()) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(1); + + ts.assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayBackpressuredMixed3() { + TestSubscriber ts = Maybe.mergeArray(Maybe.empty(), Maybe.just(2), Maybe.just(3)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(2); + + ts.request(1); + + ts.assertResult(2, 3); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Maybe.mergeArray(Maybe.just(1), Maybe.just(2), Maybe.just(3)).subscribe(ts); + + ts.assertSubscribed() + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertResult(1, 2, 3); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayFusedRace() { + for (int i = 0; i < 500; i++) { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Maybe.mergeArray(pp1.toMaybe(), pp2.toMaybe()).subscribe(ts); + + ts.assertSubscribed() + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + ; + + TestHelper.race(new Runnable() { + @Override + public void run() { + pp1.onNext(1); + pp1.onComplete(); + } + }, new Runnable() { + @Override + public void run() { + pp2.onNext(1); + pp2.onComplete(); + } + }, Schedulers.single()); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 1); + } + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayZero() { + assertSame(Flowable.empty(), Maybe.mergeArray()); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeArrayOne() { + Maybe.mergeArray(Maybe.just(1)).test().assertResult(1); + } + + @Test + public void mergePublisher() { + Maybe.merge(Flowable.just(Maybe.just(1), Maybe.just(2), Maybe.just(3))) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void mergePublisherMaxConcurrent() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Maybe.merge(Flowable.just(pp1.toMaybe(), pp2.toMaybe()), 1).test(0L); + + assertTrue(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + ts.request(1); + + assertFalse(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + } + + @Test + public void mergeMaybe() { + Maybe.merge(Maybe.just(Maybe.just(1))) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeIterable() { + Maybe.merge(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3))) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void mergeALot() { + @SuppressWarnings("unchecked") + Maybe[] sources = new Maybe[Flowable.bufferSize() * 2]; + Arrays.fill(sources, Maybe.just(1)); + + Maybe.mergeArray(sources) + .test() + .assertSubscribed() + .assertValueCount(sources.length) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mergeALotLastEmpty() { + @SuppressWarnings("unchecked") + Maybe[] sources = new Maybe[Flowable.bufferSize() * 2]; + Arrays.fill(sources, Maybe.just(1)); + sources[sources.length - 1] = Maybe.empty(); + + Maybe.mergeArray(sources) + .test() + .assertSubscribed() + .assertValueCount(sources.length - 1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mergeALotFused() { + @SuppressWarnings("unchecked") + Maybe[] sources = new Maybe[Flowable.bufferSize() * 2]; + Arrays.fill(sources, Maybe.just(1)); + + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Maybe.mergeArray(sources).subscribe(ts); + + ts + .assertSubscribed() + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertValueCount(sources.length) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mergeErrorSuccess() { + Maybe.merge(Maybe.error(new TestException()), Maybe.just(1)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mergeSuccessError() { + Maybe.merge(Maybe.just(1), Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void subscribeZero() { + assertTrue(Maybe.just(1) + .subscribe().isDisposed()); + } + + @Test + public void subscribeZeroError() { + List errors = TestHelper.trackPluginErrors(); + + try { + assertTrue(Maybe.error(new TestException()) + .subscribe().isDisposed()); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void subscribeToOnSuccess() { + final List values = new ArrayList(); + + Consumer onSuccess = new Consumer() { + @Override + public void accept(Integer e) throws Exception { + values.add(e); + } + }; + + Maybe source = Maybe.just(1); + + source.subscribe(onSuccess); + source.subscribe(onSuccess, Functions.emptyConsumer()); + source.subscribe(onSuccess, Functions.emptyConsumer(), Functions.EMPTY_ACTION); + + assertEquals(Arrays.asList(1, 1, 1), values); + } + + @Test + public void subscribeToOnError() { + final List values = new ArrayList(); + + Consumer onError = new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + values.add(e); + } + }; + + TestException ex = new TestException(); + + Maybe source = Maybe.error(ex); + + source.subscribe(Functions.emptyConsumer(), onError); + source.subscribe(Functions.emptyConsumer(), onError, Functions.EMPTY_ACTION); + + assertEquals(Arrays.asList(ex, ex), values); + } + + @Test + public void subscribeToOnComplete() { + final List values = new ArrayList(); + + Action onComplete = new Action() { + @Override + public void run() throws Exception { + values.add(100); + } + }; + + Maybe source = Maybe.empty(); + + source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete); + + assertEquals(Arrays.asList(100), values); + } + + @Test + public void subscribeWith() { + MaybeObserver mo = new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(Integer value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + + }; + + assertSame(mo, Maybe.just(1).subscribeWith(mo)); + } + + @Test + public void doOnEventSuccess() { + final List list = new ArrayList(); + + assertTrue(Maybe.just(1) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + list.add(v); + list.add(e); + } + }) + .subscribe().isDisposed()); + + assertEquals(Arrays.asList(1, null), list); + } + + @Test + public void doOnEventError() { + final List list = new ArrayList(); + + TestException ex = new TestException(); + + assertTrue(Maybe.error(ex) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + list.add(v); + list.add(e); + } + }) + .subscribe().isDisposed()); + + assertEquals(Arrays.asList(null, ex), list); + } + + @Test + public void doOnEventComplete() { + final List list = new ArrayList(); + + assertTrue(Maybe.empty() + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + list.add(v); + list.add(e); + } + }) + .subscribe().isDisposed()); + + assertEquals(Arrays.asList(null, null), list); + } + + @Test(expected = NullPointerException.class) + public void doOnEventNull() { + Maybe.just(1).doOnEvent(null); + } + + @Test + public void doOnEventSuccessThrows() { + Maybe.just(1) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doOnEventErrorThrows() { + TestSubscriber ts = Maybe.error(new TestException("Outer")) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List list = TestHelper.compositeList(ts.errors().get(0)); + TestHelper.assertError(list, 0, TestException.class, "Outer"); + TestHelper.assertError(list, 1, TestException.class, "Inner"); + assertEquals(2, list.size()); + } + + + @Test + public void doOnEventCompleteThrows() { + Maybe.empty() + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } }