Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Arrays;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -213,6 +214,47 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testForEach() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testForEachWithError() {
try {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
fail("we expect an exception to be thrown");
}catch(Exception e) {
// do nothing as we expect this
}
verify(a, times(0)).received(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifications could be removed since there are no usages of a in the forEach closure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Submitting a commit to remove those now.

verify(a, times(0)).received(2);
verify(a, times(0)).received(3);
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(50)
}catch(Exception e) {
// ignore
}
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
}
}

def class TestFactory {
int counter = 1;
Expand Down
79 changes: 77 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -332,6 +334,80 @@ public void onNext(T args) {
});
}

/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
* <p>
* NOTE: This will block even if the Observable is asynchronous.
* <p>
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
*
* @param onNext
* {@link Action1}
* @throws RuntimeException
* if error occurs
*/
public void forEach(final Action1<T> onNext) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();

subscribe(new Observer<T>() {
public void onCompleted() {
latch.countDown();
}

public void onError(Exception e) {
/*
* If we receive an onError event we set the reference on the outer thread
* so we can git it and throw after the latch.await().
*
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
*/
exceptionFromOnError.set(e);
latch.countDown();
}

public void onNext(T args) {
onNext.call(args);
}
});
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}

if (exceptionFromOnError.get() != null) {
if (exceptionFromOnError.get() instanceof RuntimeException) {
throw (RuntimeException) exceptionFromOnError.get();
} else {
throw new RuntimeException(exceptionFromOnError.get());
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public void forEach(final Object o) {
if (o instanceof Action1) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
forEach((Action1) o);
}

// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);

forEach(new Action1() {

public void call(Object args) {
onNext.call(args);
}

});
}

/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
Expand Down Expand Up @@ -2543,7 +2619,6 @@ public void testSequenceEqual() {
verify(result, times(1)).onNext(false);
}



}

}
9 changes: 7 additions & 2 deletions rxjava-core/src/main/java/rx/util/AtomicObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public void onError(Exception e) {

@Override
public void onNext(T args) {
if (!isFinished.get()) {
actual.onNext(args);
try {
if (!isFinished.get()) {
actual.onNext(args);
}
}catch(Exception e) {
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
}

Expand Down