Skip to content

Commit b960b98

Browse files
Merge pull request ReactiveX#147 from benjchristensen/issue-45
Operator: forEach
2 parents d791e87 + 88fa9e7 commit b960b98

File tree

3 files changed

+127
-4
lines changed

3 files changed

+127
-4
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.Arrays;
2222

2323
import org.junit.Before;
2424
import org.junit.Test;
25+
import static org.junit.Assert.*;
2526
import org.mockito.Mock;
2627
import org.mockito.MockitoAnnotations;
2728

@@ -213,6 +214,44 @@ def class ObservableTests {
213214
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
214215
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
215216
}
217+
218+
@Test
219+
public void testForEach() {
220+
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
221+
verify(a, times(1)).received(1);
222+
verify(a, times(1)).received(2);
223+
verify(a, times(1)).received(3);
224+
}
225+
226+
@Test
227+
public void testForEachWithError() {
228+
try {
229+
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
230+
fail("we expect an exception to be thrown");
231+
}catch(Exception e) {
232+
// do nothing as we expect this
233+
}
234+
}
235+
236+
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
237+
238+
public Subscription call(final Observer<Integer> observer) {
239+
new Thread(new Runnable() {
240+
public void run() {
241+
try {
242+
Thread.sleep(50)
243+
}catch(Exception e) {
244+
// ignore
245+
}
246+
observer.onNext(1);
247+
observer.onNext(2);
248+
observer.onNext(3);
249+
observer.onCompleted();
250+
}
251+
}).start();
252+
return Observable.noOpSubscription();
253+
}
254+
}
216255

217256
def class TestFactory {
218257
int counter = 1;

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.Future;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicReference;
2830

2931
import org.junit.Before;
3032
import org.junit.Test;
@@ -332,6 +334,84 @@ public void onNext(T args) {
332334
});
333335
}
334336

337+
/**
338+
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
339+
* <p>
340+
* NOTE: This will block even if the Observable is asynchronous.
341+
* <p>
342+
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
343+
*
344+
* @param onNext
345+
* {@link Action1}
346+
* @throws RuntimeException
347+
* if error occurs
348+
*/
349+
public void forEach(final Action1<T> onNext) {
350+
final CountDownLatch latch = new CountDownLatch(1);
351+
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();
352+
353+
subscribe(new Observer<T>() {
354+
public void onCompleted() {
355+
latch.countDown();
356+
}
357+
358+
public void onError(Exception e) {
359+
/*
360+
* If we receive an onError event we set the reference on the outer thread
361+
* so we can git it and throw after the latch.await().
362+
*
363+
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
364+
*/
365+
exceptionFromOnError.set(e);
366+
latch.countDown();
367+
}
368+
369+
public void onNext(T args) {
370+
onNext.call(args);
371+
}
372+
});
373+
// block until the subscription completes and then return
374+
try {
375+
latch.await();
376+
} catch (InterruptedException e) {
377+
// set the interrupted flag again so callers can still get it
378+
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
379+
Thread.currentThread().interrupt();
380+
// using Runtime so it is not checked
381+
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
382+
}
383+
384+
if (exceptionFromOnError.get() != null) {
385+
if (exceptionFromOnError.get() instanceof RuntimeException) {
386+
throw (RuntimeException) exceptionFromOnError.get();
387+
} else {
388+
throw new RuntimeException(exceptionFromOnError.get());
389+
}
390+
}
391+
}
392+
393+
@SuppressWarnings({ "rawtypes", "unchecked" })
394+
public void forEach(final Object o) {
395+
if (o instanceof Action1) {
396+
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
397+
forEach((Action1) o);
398+
}
399+
400+
// lookup and memoize onNext
401+
if (o == null) {
402+
throw new RuntimeException("onNext must be implemented");
403+
}
404+
final FuncN onNext = Functions.from(o);
405+
406+
forEach(new Action1() {
407+
408+
public void call(Object args) {
409+
onNext.call(args);
410+
}
411+
412+
});
413+
}
414+
335415
/**
336416
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
337417
*
@@ -2543,7 +2623,6 @@ public void testSequenceEqual() {
25432623
verify(result, times(1)).onNext(false);
25442624
}
25452625

2546-
2547-
25482626
}
2627+
25492628
}

rxjava-core/src/main/java/rx/util/AtomicObserver.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,13 @@ public void onError(Exception e) {
6767

6868
@Override
6969
public void onNext(T args) {
70-
if (!isFinished.get()) {
71-
actual.onNext(args);
70+
try {
71+
if (!isFinished.get()) {
72+
actual.onNext(args);
73+
}
74+
}catch(Exception e) {
75+
// handle errors if the onNext implementation fails, not just if the Observable fails
76+
onError(e);
7277
}
7378
}
7479

0 commit comments

Comments
 (0)