diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c..e332098ba2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,8 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationTake; +import rx.operators.OperationTakeWhile; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -54,7 +56,6 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1779,7 +1780,7 @@ public static Observable takeLast(final Observable items, final int co * @return */ public static Observable takeWhile(final Observable items, Func1 predicate) { - return create(OperationTake.takeWhile(items, predicate)); + return create(OperationTakeWhile.takeWhile(items, predicate)); } /** @@ -1811,16 +1812,18 @@ public Boolean call(T t) { * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTake.takeWhileWithIndex(items, predicate)); + return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); - return create(OperationTake.takeWhileWithIndex(items, new Func2() { + return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2() + { @Override - public Boolean call(T t, Integer integer) { + public Boolean call(T t, Integer integer) + { return (Boolean) _f.call(t, integer); } })); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e86263e562..c4335b71d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,21 +15,26 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.subjects.Subject; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.testing.TrustedObservableTester.assertTrustedObservable; + /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -43,61 +48,17 @@ public final class OperationTake { * @return */ public static Func1, Subscription> take(final Observable items, final int num) { - return takeWhileWithIndex(items, OperationTake. numPredicate(num)); - } - - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param items - * @param predicate - * a function to test each source element for a condition - * @return - */ - public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTake. skipIndex(predicate)); - } - - /** - * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. - * - * @param items - * @param predicate - * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return - */ - public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new TakeWhile(items, predicate).call(observer); + return new Take(items, num).call(observer); } }; } - private static Func2 numPredicate(final int num) { - return new Func2() { - - @Override - public Boolean call(T input, Integer index) { - return index < num; - } - - }; - } - - private static Func2 skipIndex(final Func1 underlying) { - return new Func2() { - @Override - public Boolean call(T input, Integer index) { - return underlying.call(input); - } - }; - } - /** * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. *

@@ -109,19 +70,41 @@ public Boolean call(T input, Integer index) { * * @param */ - private static class TakeWhile implements Func1, Subscription> { + private static class Take implements Func1, Subscription> { private final AtomicInteger counter = new AtomicInteger(); private final Observable items; - private final Func2 predicate; + private final int num; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - private TakeWhile(Observable items, Func2 predicate) { + private Take(Observable items, int num) { this.items = items; - this.predicate = predicate; + this.num = num; } @Override public Subscription call(Observer observer) { + if (num < 1) { + items.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + }).unsubscribe(); + observer.onCompleted(); + return Subscriptions.empty(); + } + return subscription.wrap(items.subscribe(new ItemObserver(observer))); } @@ -134,20 +117,28 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - observer.onCompleted(); + if (counter.getAndSet(num) < num) { + observer.onCompleted(); + } } @Override public void onError(Exception e) { - observer.onError(e); + if (counter.getAndSet(num) < num) { + observer.onError(e); + } } @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + final int count = counter.incrementAndGet(); + if (count <= num) { observer.onNext(args); - } else { - observer.onCompleted(); + if (count == num) { + observer.onCompleted(); + } + } + if (count >= num) { // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } @@ -160,65 +151,9 @@ public void onNext(T args) { public static class UnitTest { @Test - public void testTakeWhile1() { - Observable w = Observable.toObservable(1, 2, 3); - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - - s.onNext(1); - s.onNext(2); - s.onNext(3); - s.onNext(4); - s.onNext(5); - s.onCompleted(); - - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onNext(4); - verify(aObserver, never()).onNext(5); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhile2() { + public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { - @Override - public Boolean call(String input, Integer index) { - return index < 2; - } - })); + Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -231,33 +166,59 @@ public Boolean call(String input, Integer index) { } @Test - public void testTake1() { + public void testTake2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 2)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); take.subscribe(aObserver); verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("two"); verify(aObserver, never()).onNext("three"); verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); } @Test - public void testTake2() { - Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 1)); + public void testTakeDoesntLeakErrors() { + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + Observable.create(assertTrustedObservable(take(source, 1))).last(); + } - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, never()).onNext("two"); - verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); + @Test + public void testTakeZeroDoesntLeakError() { + final AtomicBoolean subscribed = new AtomicBoolean(false); + final AtomicBoolean unSubscribed = new AtomicBoolean(false); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + subscribed.set(true); + observer.onError(new Exception("test failed")); + return new Subscription() + { + @Override + public void unsubscribe() + { + unSubscribed.set(true); + } + }; + } + }); + Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + assertTrue("source subscribed", subscribed.get()); + assertTrue("source unsubscribed", unSubscribed.get()); } @Test @@ -267,7 +228,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); take.subscribe(aObserver); // wait for the Observable to complete diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java new file mode 100644 index 0000000000..f45efabc92 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -0,0 +1,313 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.subjects.Subject; +/** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + */ +public final class OperationTakeWhile { + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate + * a function to test each source element for a condition + * @return + */ + public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { + return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate + * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeWhile(items, predicate).call(observer); + } + + }; + } + + private static Func2 skipIndex(final Func1 underlying) { + return new Func2() { + @Override + public Boolean call(T input, Integer index) { + return underlying.call(input); + } + }; + } + + /** + * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. + *

+ * It IS thread-safe from within it while receiving onNext events from multiple threads. + *

+ * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. + *

+ * Note how the takeWhileWithIndex() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. + * + * @param + */ + private static class TakeWhile implements Func1, Subscription> { + private final AtomicInteger counter = new AtomicInteger(); + private final Observable items; + private final Func2 predicate; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + private TakeWhile(Observable items, Func2 predicate) { + this.items = items; + this.predicate = predicate; + } + + @Override + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + if (predicate.call(args, counter.getAndIncrement())) { + observer.onNext(args); + } else { + observer.onCompleted(); + // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable + subscription.unsubscribe(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeWhile1() { + Observable w = Observable.toObservable(1, 2, 3); + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhile2() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String input, Integer index) + { + return index < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileDoesntLeakErrors() { + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + + Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + return false; + } + })).last(); + } + + @Test + public void testUnsubscribeAfterTake() { + Subscription s = mock(Subscription.class); + TestObservable w = new TestObservable(s, "one", "two", "three"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String s, Integer index) + { + return index < 1; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + w.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + System.out.println("TestObservable thread finished"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(s, times(1)).unsubscribe(); + } + + private static class TestObservable extends Observable { + + final Subscription s; + final String[] values; + Thread t = null; + + public TestObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + } + + @Override + public Subscription subscribe(final Observer observer) { + System.out.println("TestObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestObservable thread"); + for (String s : values) { + System.out.println("TestObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestObservable thread"); + t.start(); + System.out.println("done starting TestObservable thread"); + return s; + } + + } + } + +} diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java new file mode 100644 index 0000000000..f48be8d3f5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java @@ -0,0 +1,253 @@ +package rx.testing; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class TrustedObservableTester +{ + private TrustedObservableTester() {} + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) + { + return new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } + + public static class TestingObserver implements Observer { + + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); + + public TestingObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } + + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } + + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } + + } + + public static class UnitTest { + @Test(expected = AssertionError.class) + public void testDoubleCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + + } + + @Test(expected = AssertionError.class) + public void testCompletedError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testCompletedNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testErrorCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testDoubleError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + + @Test(expected = AssertionError.class) + public void testErrorNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testNextCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testConcurrentNextNext() { + final List threads = new ArrayList(); + final AtomicReference threadFailure = new AtomicReference(); + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(final Observer observer) + { + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("one"); + } + })); + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("two"); + } + })); + return Subscriptions.empty(); + } + })).subscribe(new SlowObserver()); + for (Thread thread : threads) { + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread thread, Throwable throwable) + { + threadFailure.set(throwable); + } + }); + thread.start(); + } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + // Junit seems pretty bad about exposing test failures inside of created threads. + assertNotNull("exception thrown by thread", threadFailure.get()); + assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass()); + } + + private static class SlowObserver implements Observer + { + @Override + public void onCompleted() + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onError(Exception e) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onNext(String args) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } + } +}