Skip to content

1.x: concatMap full rewrite + delayError + performance #3759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,9 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<?
* {@code observables}, one after the other, without interleaving them
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
return observables.lift(OperatorConcat.<T>instance());
return observables.concatMap((Func1)UtilityFunctions.identity());
}

/**
Expand Down Expand Up @@ -1158,6 +1159,45 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
return concat(just(t1, t2, t3, t4, t5, t6, t7, t8, t9));
}

/**
* Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable,
* one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sources the Observable sequence of Observables
* @return the new Observable with the concatenating behavior
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Experimental
public static <T> Observable<T> concatDelayError(Observable<? extends Observable<? extends T>> sources) {
return sources.concatMapDelayError((Func1)UtilityFunctions.identity());
}

/**
* Concatenates the Iterable sequence of Observables into a single sequence by subscribing to each Observable,
* one after the other, one at a time and delays any errors till the all inner Observables terminate.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sources the Iterable sequence of Observables
* @return the new Observable with the concatenating behavior
*/
@Experimental
public static <T> Observable<T> concatDelayError(Iterable<? extends Observable<? extends T>> sources) {
return concatDelayError(from(sources));
}

/**
* Returns an Observable that calls an Observable factory to create an Observable for each new Observer
* that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is
Expand Down Expand Up @@ -3957,7 +3997,37 @@ public final R call(R state, T value) {
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return concat(map(func));
if (this instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
return scalar.scalarFlatMap(func);
}
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
}

/**
* Maps each of the items into an Observable, subscribes to them one after the other,
* one at a time and emits their values in order
* while delaying any error from either this or any of the inner Observables
* till all of them terminate.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatMapDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param func the function that maps the items of this Observable into the inner Observables.
* @return the new Observable instance with the concatenation behavior
*/
@Experimental
public final <R> Observable<R> concatMapDelayError(Func1<? super T, ? extends Observable<?extends R>> func) {
if (this instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
return scalar.scalarFlatMap(func);
}
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.END));
}

/**
Expand Down
41 changes: 32 additions & 9 deletions src/main/java/rx/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
*/
package rx.exceptions;

import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.io.*;
import java.util.*;

import rx.annotations.Experimental;

/**
* Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException}
Expand Down Expand Up @@ -73,6 +68,34 @@ public CompositeException(Collection<? extends Throwable> errors) {
this(null, errors);
}

/**
* Constructs a CompositeException instance with the supplied initial Throwables.
* @param errors the array of Throwables
*/
@Experimental
public CompositeException(Throwable... errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
List<Throwable> _exceptions = new ArrayList<Throwable>();
if (errors != null) {
for (Throwable ex : errors) {
if (ex instanceof CompositeException) {
deDupedExceptions.addAll(((CompositeException) ex).getExceptions());
} else
if (ex != null) {
deDupedExceptions.add(ex);
} else {
deDupedExceptions.add(new NullPointerException());
}
}
} else {
deDupedExceptions.add(new NullPointerException());
}

_exceptions.addAll(deDupedExceptions);
this.exceptions = Collections.unmodifiableList(_exceptions);
this.message = exceptions.size() + " exceptions occurred. ";
}

/**
* Retrieves the list of exceptions that make up the {@code CompositeException}
*
Expand Down
Loading