Skip to content

Commit d7997fb

Browse files
Observable.from with scheduler
1 parent 728f183 commit d7997fb

File tree

4 files changed

+91
-36
lines changed

4 files changed

+91
-36
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ public static <T> Observable<T> error(Throwable exception, Scheduler scheduler)
732732
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
733733
*/
734734
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
735-
return create(OperationToObservableIterable.toObservableIterable(iterable));
735+
return from(iterable, Schedulers.currentThread());
736736
}
737737

738738
/**
@@ -751,7 +751,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
751751
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
752752
*/
753753
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
754-
return from(iterable).observeOn(scheduler);
754+
return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler));
755755
}
756756

757757
/**
@@ -764,14 +764,35 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler s
764764
* {@link Subscription} is returned, it is not possible to unsubscribe from
765765
* the sequence before it completes.
766766
*
767-
* @param items the source sequence
767+
* @param items the source array
768768
* @param <T> the type of items in the Array and the type of items to be
769769
* emitted by the resulting Observable
770770
* @return an Observable that emits each item in the source Array
771771
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
772772
*/
773773
public static <T> Observable<T> from(T[] items) {
774-
return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items)));
774+
return from(Arrays.asList(items));
775+
}
776+
777+
/**
778+
* Converts an Array into an Observable.
779+
* <p>
780+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
781+
* <p>
782+
* Note: the entire array is immediately emitted each time an
783+
* {@link Observer} subscribes. Since this occurs before the
784+
* {@link Subscription} is returned, it is not possible to unsubscribe from
785+
* the sequence before it completes.
786+
*
787+
* @param items the source array
788+
* @param scheduler the scheduler to emit the items of the array
789+
* @param <T> the type of items in the Array and the type of items to be
790+
* emitted by the resulting Observable
791+
* @return an Observable that emits each item in the source Array
792+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
793+
*/
794+
public static <T> Observable<T> from(T[] items, Scheduler scheduler) {
795+
return from(Arrays.asList(items), scheduler);
775796
}
776797

777798
/**
@@ -828,7 +849,7 @@ public static <T> Observable<T> from(T t1, T t2) {
828849
* subscribes. Since this occurs before the {@link Subscription} is
829850
* returned, it is not possible to unsubscribe from the sequence before it
830851
* completes.
831-
*
852+
*
832853
* @param t1 first item
833854
* @param t2 second item
834855
* @param t3 third item
@@ -1074,7 +1095,7 @@ public static Observable<Integer> range(int start, int count) {
10741095
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
10751096
*/
10761097
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
1077-
return range(start, count).observeOn(scheduler);
1098+
return from(Range.createWithCount(start, count), scheduler);
10781099
}
10791100

10801101
/**
@@ -1121,10 +1142,7 @@ public static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> o
11211142
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
11221143
*/
11231144
public static <T> Observable<T> just(T value) {
1124-
List<T> list = new ArrayList<T>();
1125-
list.add(value);
1126-
1127-
return from(list);
1145+
return from(Arrays.asList((value)));
11281146
}
11291147

11301148
/**
@@ -1143,7 +1161,7 @@ public static <T> Observable<T> just(T value) {
11431161
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
11441162
*/
11451163
public static <T> Observable<T> just(T value, Scheduler scheduler) {
1146-
return just(value).observeOn(scheduler);
1164+
return from(Arrays.asList((value)), scheduler);
11471165
}
11481166

11491167
/**

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717

1818
import rx.Observable.OnSubscribeFunc;
1919
import rx.Observer;
20+
import rx.Scheduler;
2021
import rx.Subscription;
22+
import rx.schedulers.Schedulers;
2123
import rx.subscriptions.Subscriptions;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Action1;
26+
27+
import java.util.Iterator;
2228

2329
/**
2430
* Converts an Iterable sequence into an Observable.
@@ -30,24 +36,42 @@
3036
*/
3137
public final class OperationToObservableIterable<T> {
3238

39+
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
40+
return new ToObservableIterable<T>(list, scheduler);
41+
}
42+
3343
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
34-
return new ToObservableIterable<T>(list);
44+
return toObservableIterable(list, Schedulers.currentThread());
3545
}
3646

3747
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
38-
public ToObservableIterable(Iterable<? extends T> list) {
48+
49+
public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
3950
this.iterable = list;
51+
this.scheduler = scheduler;
4052
}
4153

42-
public Iterable<? extends T> iterable;
43-
44-
public Subscription onSubscribe(Observer<? super T> observer) {
45-
for (T item : iterable) {
46-
observer.onNext(item);
47-
}
48-
observer.onCompleted();
54+
Scheduler scheduler;
55+
final Iterable<? extends T> iterable;
4956

50-
return Subscriptions.empty();
57+
public Subscription onSubscribe(final Observer<? super T> observer) {
58+
final Iterator<? extends T> iterator = iterable.iterator();
59+
return scheduler.schedule(new Action1<Action0>() {
60+
@Override
61+
public void call(Action0 self) {
62+
try {
63+
if (iterator.hasNext()) {
64+
T x = iterator.next();
65+
observer.onNext(x);
66+
self.call();
67+
} else {
68+
observer.onCompleted();
69+
}
70+
} catch (Exception e) {
71+
observer.onError(e);
72+
}
73+
}
74+
});
5175
}
5276
}
5377
}

rxjava-core/src/test/java/rx/ObservableWindowTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class ObservableWindowTests {
3030
@Test
3131
public void testWindow() {
3232
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
33+
/*
3334
Observable.from(1, 2, 3, 4, 5, 6)
3435
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {
3536
@@ -45,8 +46,22 @@ public void call(List<Integer> t) {
4546
lists.add(t);
4647
}
4748
});
49+
*/
4850

49-
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
51+
Observable.concat(Observable.from(1, 2, 3, 4, 5, 6).window(3).map(new Func1<Observable<Integer>, Observable<List<Integer>>>() {
52+
@Override
53+
public Observable<List<Integer>> call(Observable<Integer> xs) {
54+
return xs.toList();
55+
}
56+
})).toBlockingObservable().forEach(new Action1<List<Integer>>() {
57+
58+
@Override
59+
public void call(List<Integer> xs) {
60+
lists.add(xs);
61+
}
62+
});
63+
64+
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[]{1, 2, 3});
5065
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
5166
assertEquals(2, lists.size());
5267

rxjava-core/src/test/java/rx/operators/OperationWindowTest.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rx.Observable;
2929
import rx.Observer;
3030
import rx.Subscription;
31+
import rx.schedulers.Schedulers;
3132
import rx.schedulers.TestScheduler;
3233
import rx.subscriptions.Subscriptions;
3334
import rx.util.functions.Action0;
@@ -44,21 +45,18 @@ public void before() {
4445
scheduler = new TestScheduler();
4546
}
4647

47-
private static <T> List<List<T>> toLists(Observable<Observable<T>> observable) {
48-
final List<T> list = new ArrayList<T>();
49-
final List<List<T>> lists = new ArrayList<List<T>>();
48+
private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {
5049

51-
observable.subscribe(new Action1<Observable<T>>() {
50+
final List<List<T>> lists = new ArrayList<List<T>>();
51+
Observable.concat(observables.map(new Func1<Observable<T>, Observable<List<T>>>() {
5252
@Override
53-
public void call(Observable<T> tObservable) {
54-
tObservable.subscribe(new Action1<T>() {
55-
@Override
56-
public void call(T t) {
57-
list.add(t);
58-
}
59-
});
60-
lists.add(new ArrayList<T>(list));
61-
list.clear();
53+
public Observable<List<T>> call(Observable<T> xs) {
54+
return xs.toList();
55+
}
56+
})).toBlockingObservable().forEach(new Action1<List<T>>() {
57+
@Override
58+
public void call(List<T> xs) {
59+
lists.add(xs);
6260
}
6361
});
6462
return lists;
@@ -90,7 +88,7 @@ public void testSkipAndCountGaplessEindows() {
9088

9189
@Test
9290
public void testOverlappingWindows() {
93-
Observable<String> subject = Observable.from("zero", "one", "two", "three", "four", "five");
91+
Observable<String> subject = Observable.from(new String[]{"zero", "one", "two", "three", "four", "five"}, Schedulers.currentThread());
9492
Observable<Observable<String>> windowed = Observable.create(window(subject, 3, 1));
9593

9694
List<List<String>> windows = toLists(windowed);

0 commit comments

Comments
 (0)