diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index f9dd5b9428..7ed2fd4a42 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2217,6 +2217,21 @@ public static Observable timer(long initialDelay, long period, TimeUnit un public static Observable timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler)); } + + /** + * Return an Observable which concatenates the observable sequences obtained by running the + * resultSelector for each element in the given Iterable source. + * @param the Iterable sequence value type + * @param the result type + * @param source the source iterable + * @param resultSelector the selector function that returns an Observable + * sequence for each value of the {@code source} iterable sequence + * @return an Observable which concatenates the observable sequences obtained by running the + * resultSelector for each element in the given Iterable source. + */ + public static Observable forIterable(Iterable source, Func1> resultSelector) { + return create(OperationConcat.forIterable(source, resultSelector)); + } /** * Returns an Observable that emits the items emitted by the source diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index e2c9be1ebf..52428c3a51 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -15,6 +15,7 @@ */ package rx.operators; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -23,6 +24,8 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.SerialSubscription; +import rx.util.functions.Func1; /** * Returns an Observable that emits the items emitted by two or more Observables, one after the @@ -168,4 +171,93 @@ public void unsubscribe() { }; } } + + /** + * Concatenates the observable sequences obtained by running the + * resultSelector for each element in the given Iterable source. + * @param the source value type + * @param the result value type + * @param source the source sequence + * @param resultSelector the result selector function to return + * an Observable sequence for concatenation + * @return a subscriber function + */ + public static OnSubscribeFunc forIterable(Iterable source, Func1> resultSelector) { + return new For(source, resultSelector); + } + + /** + * For each source element in an iterable, concatenate + * the observables returned by a selector function. + * @param the source value type + * @param the result value type + */ + private static final class For implements OnSubscribeFunc { + final Iterable source; + final Func1> resultSelector; + + public For(Iterable source, Func1> resultSelector) { + this.source = source; + this.resultSelector = resultSelector; + } + + @Override + public Subscription onSubscribe(Observer t1) { + SerialSubscription ssub = new SerialSubscription(); + Iterator it = source.iterator(); + ValueObserver vo = new ValueObserver(t1, ssub, it, resultSelector); + vo.onCompleted(); // trigger the first subscription + return ssub; + } + } + /** + * The observer of values in the returned observables. + */ + private static final class ValueObserver implements Observer { + final Observer observer; + final SerialSubscription cancel; + final Iterator iterator; + final Func1> resultSelector; + public ValueObserver( + Observer observer, + SerialSubscription cancel, + Iterator iterator, + Func1> resultSelector + ) { + this.observer = observer; + this.cancel = cancel; + this.iterator = iterator; + this.resultSelector = resultSelector; + } + + @Override + public void onNext(R args) { + observer.onNext(args); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + try { + if (iterator.hasNext()) { + T v = iterator.next(); + Observable o = resultSelector.call(v); + cancel.setSubscription(o.subscribe(this)); + return; + } + } catch (Throwable t) { + observer.onError(t); + cancel.unsubscribe(); + return; + } + observer.onCompleted(); + cancel.unsubscribe(); + } + + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index 24146ec131..68758f61a6 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -21,9 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -32,8 +34,10 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Func1; public class OperationConcatTest { @@ -598,4 +602,252 @@ public void testMultipleObservers() { verify(o1, never()).onError(any(Throwable.class)); verify(o2, never()).onError(any(Throwable.class)); } + + @Test + public void testForIterable() { + List keys = Arrays.asList(0, 1, 2); + final List> values = Arrays.asList( + Observable.from(1, 2, 3), + Observable.from(4, 5, 6), + Observable.from(7, 8, 9) + ); + Func1> selector = new Func1>() { + + @Override + public Observable call(Integer t1) { + return values.get(t1); + } + }; + + Observable result = Observable.forIterable(keys, selector); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + for (int i = 1; i < 10; i++) { + inOrder.verify(o, times(1)).onNext(i); + } + inOrder.verify(o, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testForIterableLongSequence() { + int n = 250; + List keys = new ArrayList(n); + List expected = new ArrayList(3 * n); + for (int i = 1; i <= n; i++) { + keys.add(n); + for (int j = 1; j <= 3; j++) { + expected.add(j); + } + } + + Func1> selector = new Func1>() { + @Override + public Observable call(Integer t1) { + return Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread()); + } + }; + + Observable result = Observable.forIterable(keys, selector); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + for (Integer i : expected) { + inOrder.verify(o, times(1)).onNext(i); + } + inOrder.verify(o, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testForIterableEmpty() { + List keys = Arrays.asList(); + final AtomicBoolean selectorCalled = new AtomicBoolean(); + Func1> selector = new Func1>() { + @Override + public Observable call(Integer t1) { + selectorCalled.set(true); + throw new IllegalStateException("Shouldn't get here!"); + } + }; + Observable result = Observable.forIterable(keys, selector); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + inOrder.verify(o, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + + assertFalse(selectorCalled.get()); + } + static final class CustomException extends RuntimeException { + public CustomException(String message) { + super(message); + } + } + @Test + public void testForIterableIteratorHasNextThrows() { + final AtomicBoolean nextCalled = new AtomicBoolean(); + Iterable keys = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public boolean hasNext() { + throw new CustomException("Forced failure"); + } + + @Override + public Integer next() { + nextCalled.set(true); + throw new IllegalStateException("Shouldn't get here!"); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only sequence"); + } + + }; + } + + }; + + final AtomicBoolean selectorCalled = new AtomicBoolean(); + + Func1> selector = new Func1>() { + @Override + public Observable call(Integer t1) { + selectorCalled.set(true); + throw new IllegalStateException("Shouldn't get here!"); + } + }; + + Observable result = Observable.forIterable(keys, selector); + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o, times(1)).onError(any(CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + + assertFalse(selectorCalled.get()); + assertFalse(nextCalled.get()); + } + @Test + public void testForIterableIteratorNextThrows() { + Iterable keys = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new CustomException("Forced failure"); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only sequence"); + } + + }; + } + + }; + + final AtomicBoolean selectorCalled = new AtomicBoolean(); + + Func1> selector = new Func1>() { + @Override + public Observable call(Integer t1) { + selectorCalled.set(true); + throw new IllegalStateException("Shouldn't get here!"); + } + }; + + Observable result = Observable.forIterable(keys, selector); + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o, times(1)).onError(any(CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + + assertFalse(selectorCalled.get()); + } + + @Test + public void testForIterableSelectorThrows() { + List keys = Arrays.asList(0); + + + Func1> selector = new Func1>() { + @Override + public Observable call(Integer t1) { + throw new CustomException("Forced failure!"); + } + }; + + Observable result = Observable.forIterable(keys, selector); + Observer o = mock(Observer.class); + + result.subscribe(o); + + verify(o, times(1)).onError(any(CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + + @Test + public void testForIterableOneThrows() { + List keys = Arrays.asList(0, 1, 2); + final List> values = Arrays.asList( + Observable.from(1, 2, 3), + Observable.from(4, 5, 6), + Observable.error(new CustomException("Forced failure")) + ); + Func1> selector = new Func1>() { + + @Override + public Observable call(Integer t1) { + return values.get(t1); + } + }; + + Observable result = Observable.forIterable(keys, selector); + + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + for (int i = 1; i < 7; i++) { + inOrder.verify(o, times(1)).onNext(i); + } + inOrder.verify(o, times(1)).onError(any(CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onCompleted(); + } }