Skip to content

Commit 670cee3

Browse files
Merge pull request #594 from zsxwing/start
Implement the 'Start' operator
2 parents 3bcf4e0 + c799e52 commit 670cee3

File tree

2 files changed

+165
-1
lines changed

2 files changed

+165
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import rx.util.Timestamped;
121121
import rx.util.functions.Action0;
122122
import rx.util.functions.Action1;
123+
import rx.util.functions.Async;
123124
import rx.util.functions.Func0;
124125
import rx.util.functions.Func1;
125126
import rx.util.functions.Func2;
@@ -6329,4 +6330,40 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
63296330
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
63306331
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
63316332
}
6333+
6334+
/**
6335+
* Invokes the specified function asynchronously, surfacing the result through an observable sequence.
6336+
* <p>
6337+
* Note: The function is called immediately, not during the subscription of the resulting
6338+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6339+
* function's result.
6340+
*
6341+
* @param func
6342+
* Function to run asynchronously.
6343+
* @return An observable sequence exposing the function's result value, or an exception.
6344+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229036(v=vs.103).aspx">MSDN: Observable.Start</a>
6345+
*/
6346+
public static <T> Observable<T> start(Func0<T> func) {
6347+
return Async.toAsync(func).call();
6348+
}
6349+
6350+
/**
6351+
* Invokes the specified function asynchronously on the specified scheduler, surfacing
6352+
* the result through an observable sequence.
6353+
* <p>
6354+
* Note: The function is called immediately, not during the subscription of the resulting
6355+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6356+
* function's result.
6357+
*
6358+
* @param func
6359+
* Function to run asynchronously.
6360+
* @param scheduler
6361+
* Scheduler to run the function on.
6362+
* @return An observable sequence exposing the function's result value, or an exception.
6363+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211721(v=vs.103).aspx">MSDN: Observable.Start</a>
6364+
*/
6365+
public static <T> Observable<T> start(Func0<T> func, Scheduler scheduler) {
6366+
return Async.toAsync(func, scheduler).call();
6367+
}
6368+
63326369
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.mockito.InOrder;
3434
import org.mockito.Mock;
3535
import org.mockito.MockitoAnnotations;
36+
import org.mockito.invocation.InvocationOnMock;
37+
import org.mockito.stubbing.Answer;
3638

3739
import rx.Observable.OnSubscribeFunc;
3840
import rx.schedulers.TestScheduler;
@@ -41,6 +43,7 @@
4143
import rx.subscriptions.Subscriptions;
4244
import rx.util.functions.Action0;
4345
import rx.util.functions.Action1;
46+
import rx.util.functions.Func0;
4447
import rx.util.functions.Func1;
4548
import rx.util.functions.Func2;
4649

@@ -947,4 +950,128 @@ public void testRangeWithScheduler() {
947950
inOrder.verify(aObserver, times(1)).onCompleted();
948951
inOrder.verifyNoMoreInteractions();
949952
}
950-
}
953+
954+
@Test
955+
public void testStartWithFunc() {
956+
Func0<String> func = new Func0<String>() {
957+
@Override
958+
public String call() {
959+
return "one";
960+
}
961+
};
962+
assertEquals("one", Observable.start(func).toBlockingObservable().single());
963+
}
964+
965+
@Test(expected = RuntimeException.class)
966+
public void testStartWithFuncError() {
967+
Func0<String> func = new Func0<String>() {
968+
@Override
969+
public String call() {
970+
throw new RuntimeException("Some error");
971+
}
972+
};
973+
Observable.start(func).toBlockingObservable().single();
974+
}
975+
976+
@Test
977+
public void testStartWhenSubscribeRunBeforeFunc() {
978+
TestScheduler scheduler = new TestScheduler();
979+
980+
Func0<String> func = new Func0<String>() {
981+
@Override
982+
public String call() {
983+
return "one";
984+
}
985+
};
986+
987+
Observable<String> observable = Observable.start(func, scheduler);
988+
989+
@SuppressWarnings("unchecked")
990+
Observer<String> observer = mock(Observer.class);
991+
observable.subscribe(observer);
992+
993+
InOrder inOrder = inOrder(observer);
994+
inOrder.verifyNoMoreInteractions();
995+
996+
// Run func
997+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
998+
999+
inOrder.verify(observer, times(1)).onNext("one");
1000+
inOrder.verify(observer, times(1)).onCompleted();
1001+
inOrder.verifyNoMoreInteractions();
1002+
}
1003+
1004+
@Test
1005+
public void testStartWhenSubscribeRunAfterFunc() {
1006+
TestScheduler scheduler = new TestScheduler();
1007+
1008+
Func0<String> func = new Func0<String>() {
1009+
@Override
1010+
public String call() {
1011+
return "one";
1012+
}
1013+
};
1014+
1015+
Observable<String> observable = Observable.start(func, scheduler);
1016+
1017+
// Run func
1018+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
1019+
1020+
@SuppressWarnings("unchecked")
1021+
Observer<String> observer = mock(Observer.class);
1022+
observable.subscribe(observer);
1023+
1024+
InOrder inOrder = inOrder(observer);
1025+
inOrder.verify(observer, times(1)).onNext("one");
1026+
inOrder.verify(observer, times(1)).onCompleted();
1027+
inOrder.verifyNoMoreInteractions();
1028+
}
1029+
1030+
@Test
1031+
public void testStartWithFuncAndMultipleObservers() {
1032+
TestScheduler scheduler = new TestScheduler();
1033+
1034+
@SuppressWarnings("unchecked")
1035+
Func0<String> func = (Func0<String>) mock(Func0.class);
1036+
doAnswer(new Answer<String>() {
1037+
@Override
1038+
public String answer(InvocationOnMock invocation) throws Throwable {
1039+
return "one";
1040+
}
1041+
}).when(func).call();
1042+
1043+
Observable<String> observable = Observable.start(func, scheduler);
1044+
1045+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
1046+
1047+
@SuppressWarnings("unchecked")
1048+
Observer<String> observer1 = mock(Observer.class);
1049+
@SuppressWarnings("unchecked")
1050+
Observer<String> observer2 = mock(Observer.class);
1051+
@SuppressWarnings("unchecked")
1052+
Observer<String> observer3 = mock(Observer.class);
1053+
1054+
observable.subscribe(observer1);
1055+
observable.subscribe(observer2);
1056+
observable.subscribe(observer3);
1057+
1058+
InOrder inOrder;
1059+
inOrder = inOrder(observer1);
1060+
inOrder.verify(observer1, times(1)).onNext("one");
1061+
inOrder.verify(observer1, times(1)).onCompleted();
1062+
inOrder.verifyNoMoreInteractions();
1063+
1064+
inOrder = inOrder(observer2);
1065+
inOrder.verify(observer2, times(1)).onNext("one");
1066+
inOrder.verify(observer2, times(1)).onCompleted();
1067+
inOrder.verifyNoMoreInteractions();
1068+
1069+
inOrder = inOrder(observer3);
1070+
inOrder.verify(observer3, times(1)).onNext("one");
1071+
inOrder.verify(observer3, times(1)).onCompleted();
1072+
inOrder.verifyNoMoreInteractions();
1073+
1074+
verify(func, times(1)).call();
1075+
}
1076+
1077+
}

0 commit comments

Comments
 (0)