Skip to content

Implemented distinct and distinctUntilChanged variants using a comparator #380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2948,6 +2949,35 @@ public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keyS
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
* a comparator.
*
* @param equalityComparator
* a comparator for deciding whether two emitted items are equal or not
* @return an Observable of sequentially distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229776%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
*/
public <U> Observable<T> distinctUntilChanged(Comparator<T> equalityComparator) {
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
* a key selector function and a comparator.
*
* @param keySelector
* a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
* distinct from another one or not
* @param equalityComparator
* a comparator for deciding whether two emitted item keys are equal or not
* @return an Observable of sequentially distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229533%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
*/
public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator));
}

/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
*
Expand All @@ -2958,6 +2988,19 @@ public Observable<T> distinct() {
return create(OperationDistinct.distinct(this));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a comparator.
*
* @param equalityComparator
* a comparator for deciding whether two emitted items are equal or not
* @return an Observable of distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211630(v=vs.103).aspx">MSDN: Observable.distinct</a>
*/
public <U> Observable<T> distinct(Comparator<T> equalityComparator) {
return create(OperationDistinct.distinct(this, equalityComparator));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a key selector function.
Expand All @@ -2972,6 +3015,22 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector) {
return create(OperationDistinct.distinct(this, keySelector));
}

/**
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
* a key selector function and a comparator.
*
* @param keySelector
* a function that projects an emitted item to a key value which is used for deciding whether an item is
* distinct from another one or not
* @param equalityComparator
* a comparator for deciding whether two emitted item keys are equal or not
* @return an Observable of distinct items
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229050(v=vs.103).aspx">MSDN: Observable.distinct</a>
*/
public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
}

/**
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
* <p>
Expand Down
161 changes: 153 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperationDistinct.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import static rx.Observable.empty;
import static rx.Observable.from;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.junit.Before;
Expand Down Expand Up @@ -57,6 +60,30 @@ public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source,
return new Distinct<T, U>(source, keySelector);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
* The source Observable to emit the distinct items for.
* @param equalityComparator
* The comparator to use for deciding whether to consider two items as equal or not.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<T> distinct(Observable<? extends T> source, Comparator<T> equalityComparator) {
return new DistinctWithComparator<T, T>(source, Functions.<T>identity(), equalityComparator);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
* The source Observable to emit the distinct items for.
* @param equalityComparator
* The comparator to use for deciding whether to consider the two item keys as equal or not.
* @return A subscription function for creating the target Observable.
*/
public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
return new DistinctWithComparator<T, U>(source, keySelector, equalityComparator);
}

/**
* Returns an Observable that emits all distinct items emitted by the source
* @param source
Expand Down Expand Up @@ -93,16 +120,67 @@ public void onError(Throwable e) {

@Override
public void onNext(T next) {
try {
U nextKey = keySelector.call(next);
if (!emittedKeys.contains(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
U nextKey = keySelector.call(next);
if (!emittedKeys.contains(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
}
}
});

return Subscriptions.create(new Action0() {
@Override
public void call() {
sourceSub.unsubscribe();
}
});
}
}

private static class DistinctWithComparator<T, U> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final Func1<? super T, ? extends U> keySelector;
private final Comparator<U> equalityComparator;

private DistinctWithComparator(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
this.source = source;
this.keySelector = keySelector;
this.equalityComparator = equalityComparator;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final Subscription sourceSub = source.subscribe(new Observer<T>() {

// due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here
private final List<U> emittedKeys = new ArrayList<U>();

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T next) {
U nextKey = keySelector.call(next);
if (!alreadyEmitted(nextKey)) {
emittedKeys.add(nextKey);
observer.onNext(next);
}
}

private boolean alreadyEmitted(U newKey) {
for (U key: emittedKeys) {
if (equalityComparator.compare(key, newKey) == 0) {
return true;
}
} catch (Throwable t) {
// keySelector is a user function, may throw something
observer.onError(t);
}
return false;
}
});

Expand All @@ -118,15 +196,27 @@ public void call() {
public static class UnitTest {
@Mock
Observer<? super String> w;
@Mock
Observer<? super String> w2;

// nulls lead to exceptions
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
@Override
public String call(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};

final Comparator<String> COMPARE_LENGTH = new Comparator<String>() {
@Override
public int compare(String s1, String s2) {
return s1.length() - s2.length();
}
};

@Before
public void before() {
initMocks(this);
Expand Down Expand Up @@ -182,6 +272,61 @@ public void testDistinctOfNormalSourceWithKeySelector() {
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithComparator() {
Observable<String> src = from("1", "12", "123", "aaa", "321", "12", "21", "1", "12345");
create(distinct(src, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("1");
inOrder.verify(w, times(1)).onNext("12");
inOrder.verify(w, times(1)).onNext("123");
inOrder.verify(w, times(1)).onNext("12345");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithKeySelectorAndComparator() {
Observable<String> src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("x");
inOrder.verify(w, times(1)).onNext("abc");
inOrder.verify(w, times(1)).onNext("abcd");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfNormalSourceWithKeySelectorAndComparatorAndTwoSubscriptions() {
Observable<String> src = from("a", "x", "ab", "abc", "cba", "de", "x", "a", "abcd");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("x");
create(distinct(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2);
inOrder.verify(w, times(1)).onNext("abc");
inOrder.verify(w, times(1)).onNext("abcd");
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));

InOrder inOrder2 = inOrder(w2);
inOrder2.verify(w2, times(1)).onNext("a");
inOrder2.verify(w2, times(1)).onNext("x");
inOrder2.verify(w2, times(1)).onNext("abc");
inOrder2.verify(w2, times(1)).onNext("abcd");
inOrder2.verify(w2, times(1)).onCompleted();
inOrder2.verify(w2, never()).onNext(anyString());
verify(w2, never()).onError(any(Throwable.class));
}

@Test
public void testDistinctOfSourceWithNulls() {
Observable<String> src = from(null, "a", "a", null, null, "b", null);
Expand Down
Loading