diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 73a3b3dc13..43100e1096 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -15,17 +15,16 @@ */ package rx.operators; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +36,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -68,6 +68,10 @@ public Iterator iterator() { private static class NextIterator implements Iterator { private final NextObserver observer; + private T next; + private boolean hasNext = true; + private boolean isNextConsumed = true; + private Throwable error = null; private NextIterator(NextObserver observer) { this.observer = observer; @@ -75,24 +79,62 @@ private NextIterator(NextObserver observer) { @Override public boolean hasNext() { - return !observer.isCompleted(false); - } - - @Override - public T next() { - if (observer.isCompleted(true)) { - throw new IllegalStateException("Observable is completed"); + if(error != null) { + // If any error has already been thrown, throw it again. + throw Exceptions.propagate(error); } + // Since an iterator should not be used in different thread, + // so we do not need any synchronization. + if(hasNext == false) { + // the iterator has reached the end. + return false; + } + if(isNextConsumed == false) { + // next has not been used yet. + return true; + } + return moveToNext(); + } - observer.await(); - + private boolean moveToNext() { try { - return observer.takeNext(); + Notification nextNotification = observer.takeNext(); + if(nextNotification.isOnNext()) { + isNextConsumed = false; + next = nextNotification.getValue(); + return true; + } + // If an observable is completed or fails, + // hasNext() always return false. + hasNext = false; + if(nextNotification.isOnCompleted()) { + return false; + } + if(nextNotification.isOnError()) { + error = nextNotification.getThrowable(); + throw Exceptions.propagate(error); + } + throw new IllegalStateException("Should not reach here"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw Exceptions.propagate(e); + error = e; + throw Exceptions.propagate(error); } + } + @Override + public T next() { + if(error != null) { + // If any error has already been thrown, throw it again. + throw Exceptions.propagate(error); + } + if(hasNext()) { + isNextConsumed = true; + return next; + } + else { + throw new NoSuchElementException("No more elements"); + } } @Override @@ -124,7 +166,7 @@ public void onNext(Notification args) { Notification concurrentItem = buf.poll(); // in case if we won race condition with onComplete/onError method - if (!concurrentItem.isOnNext()) { + if (concurrentItem != null && !concurrentItem.isOnNext()) { toOffer = concurrentItem; } } @@ -132,137 +174,205 @@ public void onNext(Notification args) { } - public void await() { + public Notification takeNext() throws InterruptedException { waiting.set(true); + return buf.take(); } - public boolean isCompleted(boolean rethrowExceptionIfExists) { - Notification lastItem = buf.peek(); - if (lastItem == null) { - return false; - } + } - if (lastItem.isOnError()) { - if (rethrowExceptionIfExists) { - throw Exceptions.propagate(lastItem.getThrowable()); - } else { - return true; - } - } + public static class UnitTest { - return lastItem.isOnCompleted(); + private void fireOnNextInNewThread(final Subject o, final String value) { + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + o.onNext(value); + } + }.start(); } - public T takeNext() throws InterruptedException { - Notification next = buf.take(); - - if (next.isOnError()) { - throw Exceptions.propagate(next.getThrowable()); - } - - if (next.isOnCompleted()) { - throw new IllegalStateException("Observable is completed"); - } - - return next.getValue(); - + private void fireOnErrorInNewThread(final Subject o) { + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + o.onError(new TestException()); + } + }.start(); } - } - - public static class UnitTest { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Test - public void testNext() throws Throwable { + public void testNext() { Subject obs = PublishSubject.create(); - Iterator it = next(obs).iterator(); - + fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); + assertEquals("one", it.next()); - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); - - assertTrue(it.hasNext()); - - next = nextAsync(it); - Thread.sleep(100); - obs.onNext("two"); - assertEquals("two", next.get()); - + fireOnNextInNewThread(obs, "two"); assertTrue(it.hasNext()); + assertEquals("two", it.next()); obs.onCompleted(); + assertFalse(it.hasNext()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException. assertFalse(it.hasNext()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } - @Test(expected = TestException.class) - public void testOnError() throws Throwable { + @Test + public void testNextWithError() { Subject obs = PublishSubject.create(); - Iterator it = next(obs).iterator(); - + fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); + assertEquals("one", it.next()); - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); + fireOnErrorInNewThread(obs); + try { + it.hasNext(); + fail("Expected an TestException"); + } + catch(TestException e) { + } - assertTrue(it.hasNext()); + assertErrorAfterObservableFail(it); + } - next = nextAsync(it); - Thread.sleep(100); - obs.onError(new TestException()); + @Test + public void testNextWithEmpty() { + Observable obs = Observable.empty().observeOn(Schedulers.newThread()); + Iterator it = next(obs).iterator(); + + assertFalse(it.hasNext()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + + // If the observable is completed, hasNext always returns false and next always throw a NoSuchElementException. + assertFalse(it.hasNext()); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + } + + @Test + public void testOnError() throws Throwable { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); + obs.onError(new TestException()); try { - next.get(); - } catch (ExecutionException e) { - throw e.getCause(); + it.hasNext(); + fail("Expected an TestException"); } + catch(TestException e) { + // successful + } + + assertErrorAfterObservableFail(it); } @Test - public void testOnErrorViaHasNext() throws Throwable { + public void testOnErrorInNewThread() { Subject obs = PublishSubject.create(); - Iterator it = next(obs).iterator(); - assertTrue(it.hasNext()); - - Future next = nextAsync(it); - Thread.sleep(100); - obs.onNext("one"); - assertEquals("one", next.get()); + fireOnErrorInNewThread(obs); - assertTrue(it.hasNext()); + try { + it.hasNext(); + fail("Expected an TestException"); + } + catch(TestException e) { + // successful + } - next = nextAsync(it); - Thread.sleep(100); - obs.onError(new TestException()); + assertErrorAfterObservableFail(it); + } - // this should not throw an exception but instead just return false + private void assertErrorAfterObservableFail(Iterator it) { + // After the observable fails, hasNext and next always throw the exception. try { - assertFalse(it.hasNext()); - } catch (Throwable e) { - fail("should not have received exception"); - e.printStackTrace(); + it.hasNext(); + fail("hasNext should throw a TestException"); + } + catch(TestException e){ + } + try { + it.next(); + fail("next should throw a TestException"); + } + catch(TestException e){ } } - private Future nextAsync(final Iterator it) throws Throwable { + @Test + public void testNextWithOnlyUsingNextMethod() { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); + fireOnNextInNewThread(obs, "one"); + assertEquals("one", it.next()); - return executor.submit(new Callable() { + fireOnNextInNewThread(obs, "two"); + assertEquals("two", it.next()); - @Override - public String call() throws Exception { - return it.next(); - } - }); + obs.onCompleted(); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } + } + + @Test + public void testNextWithCallingHasNextMultipleTimes() { + Subject obs = PublishSubject.create(); + Iterator it = next(obs).iterator(); + fireOnNextInNewThread(obs, "one"); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertTrue(it.hasNext()); + assertEquals("one", it.next()); + + obs.onCompleted(); + try { + it.next(); + fail("At the end of an iterator should throw a NoSuchElementException"); + } + catch(NoSuchElementException e){ + } } @SuppressWarnings("serial") @@ -329,6 +439,8 @@ public void run() { assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT); assertTrue(it.hasNext()); + int d = it.next(); + assertTrue(d > c); // shut down the thread running.set(false);