Skip to content

Implemented the 'elementAt' and 'elementAtOrDefault' operators. see #41 #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationDistinct;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
Expand Down Expand Up @@ -4186,5 +4187,45 @@ private boolean isInternalImplementation(Object o) {
Package p = o.getClass().getPackage(); // it can be null
return p != null && p.getName().startsWith("rx.operators");
}

/**
* Returns the element at a specified index in a sequence.
*
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in
* the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAt(int index) {
return create(OperationElementAt.elementAt(this, index));
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAtOrDefault(int index, T defaultValue) {
return create(OperationElementAt.elementAtOrDefault(this, index,
defaultValue));
}

}
238 changes: 238 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationElementAt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package rx.operators;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Returns the element at a specified index in a sequence.
*/
public class OperationElementAt {

/**
* Returns the element at a specified index in a sequence.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in
* the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAt(
Observable<? extends T> source, int index) {
return new ElementAt<T>(source, index, null, false);
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAtOrDefault(
Observable<? extends T> source, int index, T defaultValue) {
return new ElementAt<T>(source, index, defaultValue, true);
}

private static class ElementAt<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final int index;
private final boolean hasDefault;
private final T defaultValue;

private ElementAt(Observable<? extends T> source, int index,
T defaultValue, boolean hasDefault) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
this.hasDefault = hasDefault;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private AtomicInteger counter = new AtomicInteger();

@Override
public void onNext(T value) {
try {
int currentIndex = counter.getAndIncrement();
if (currentIndex == index) {
observer.onNext(value);
observer.onCompleted();
} else if (currentIndex > index) {
// this will work if the sequence is asynchronous,
// it will have no effect on a synchronous
// observable
subscription.unsubscribe();
}
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}

}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (index < 0) {
observer.onError(new IndexOutOfBoundsException(index
+ " is out of bounds"));
} else if (counter.get() <= index) {
if (hasDefault) {
observer.onNext(defaultValue);
observer.onCompleted();
} else {
observer.onError(new IndexOutOfBoundsException(
index + " is out of bounds"));
}
}
}
}));
}
}

public static class UnitTest {

@Test
public void testElementAt() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 1));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAt(w, -1));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 2));
try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtOrDefault() throws InterruptedException,
ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 1, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 2, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, never()).onNext(2);
verify(aObserver, times(1)).onNext(0);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, -1, 0));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}
}
}