Skip to content

Commit d511fa3

Browse files
Merge pull request #506 from akarnokd/AndPattern2
Operators: And, Then, When
2 parents 2b5ff00 + 1e7eabd commit d511fa3

20 files changed

+1714
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import rx.concurrency.Schedulers;
29+
import rx.joins.Pattern2;
30+
import rx.joins.Plan0;
2931
import rx.observables.BlockingObservable;
3032
import rx.observables.ConnectableObservable;
3133
import rx.observables.GroupedObservable;
@@ -51,6 +53,7 @@
5153
import rx.operators.OperationFirstOrDefault;
5254
import rx.operators.OperationGroupBy;
5355
import rx.operators.OperationInterval;
56+
import rx.operators.OperationJoinPatterns;
5457
import rx.operators.OperationLast;
5558
import rx.operators.OperationMap;
5659
import rx.operators.OperationMaterialize;
@@ -5689,5 +5692,173 @@ private boolean isInternalImplementation(Object o) {
56895692
return isInternal;
56905693
}
56915694
}
5692-
5695+
/**
5696+
* Creates a pattern that matches when both observable sequences have an available element.
5697+
* @param right Observable sequence to match with the left sequence.
5698+
* @return Pattern object that matches when both observable sequences have an available element.
5699+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229153.aspx'>MSDN: Observable.And</a>
5700+
* @throws NullPointerException if right is null
5701+
*/
5702+
public <T2> Pattern2<T, T2> and(Observable<T2> right) {
5703+
return OperationJoinPatterns.and(this, right);
5704+
}
5705+
/**
5706+
* Matches when the observable sequence has an available element and projects the element by invoking the selector function.
5707+
* @param selector Selector that will be invoked for elements in the source sequence.
5708+
* @return Plan that produces the projected results, to be fed (with other plans) to the When operator.
5709+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211662.aspx'>MSDN: Observable.Then</a>
5710+
* @throws NullPointerException if selector is null
5711+
*/
5712+
public <R> Plan0<R> then(Func1<T, R> selector) {
5713+
return OperationJoinPatterns.then(this, selector);
5714+
}
5715+
/**
5716+
* Joins together the results from several patterns.
5717+
* @param plans A series of plans created by use of the Then operator on patterns.
5718+
* @return An observable sequence with the results from matching several patterns.
5719+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5720+
* @throws NullPointerException if plans is null
5721+
*/
5722+
public static <R> Observable<R> when(Plan0<R>... plans) {
5723+
return create(OperationJoinPatterns.when(plans));
5724+
}
5725+
/**
5726+
* Joins together the results from several patterns.
5727+
* @param plans A series of plans created by use of the Then operator on patterns.
5728+
* @return An observable sequence with the results from matching several patterns.
5729+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229558.aspx'>MSDN: Observable.When</a>
5730+
* @throws NullPointerException if plans is null
5731+
*/
5732+
public static <R> Observable<R> when(Iterable<? extends Plan0<R>> plans) {
5733+
if (plans == null) {
5734+
throw new NullPointerException("plans");
5735+
}
5736+
return create(OperationJoinPatterns.when(plans));
5737+
}
5738+
/**
5739+
* Joins the results from a pattern.
5740+
* @param p1 the plan to join
5741+
* @return An observable sequence with the results from matching a pattern
5742+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5743+
*/
5744+
@SuppressWarnings("unchecked")
5745+
public static <R> Observable<R> when(Plan0<R> p1) {
5746+
return create(OperationJoinPatterns.when(p1));
5747+
}
5748+
/**
5749+
* Joins together the results from several patterns.
5750+
* @param p1 a plan
5751+
* @param p2 a plan
5752+
* @return An observable sequence with the results from matching several patterns
5753+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5754+
*/
5755+
@SuppressWarnings("unchecked")
5756+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2) {
5757+
return create(OperationJoinPatterns.when(p1, p2));
5758+
}
5759+
/**
5760+
* Joins together the results from several patterns.
5761+
* @param p1 a plan
5762+
* @param p2 a plan
5763+
* @param p3 a plan
5764+
* @return An observable sequence with the results from matching several patterns
5765+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5766+
*/
5767+
@SuppressWarnings("unchecked")
5768+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3) {
5769+
return create(OperationJoinPatterns.when(p1, p2, p3));
5770+
}
5771+
/**
5772+
* Joins together the results from several patterns.
5773+
* @param p1 a plan
5774+
* @param p2 a plan
5775+
* @param p3 a plan
5776+
* @param p4 a plan
5777+
* @return An observable sequence with the results from matching several patterns
5778+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5779+
*/
5780+
@SuppressWarnings("unchecked")
5781+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4) {
5782+
return create(OperationJoinPatterns.when(p1, p2, p3, p4));
5783+
}
5784+
/**
5785+
* Joins together the results from several patterns.
5786+
* @param p1 a plan
5787+
* @param p2 a plan
5788+
* @param p3 a plan
5789+
* @param p4 a plan
5790+
* @param p5 a plan
5791+
* @return An observable sequence with the results from matching several patterns
5792+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5793+
*/
5794+
@SuppressWarnings("unchecked")
5795+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5) {
5796+
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5));
5797+
}
5798+
/**
5799+
* Joins together the results from several patterns.
5800+
* @param p1 a plan
5801+
* @param p2 a plan
5802+
* @param p3 a plan
5803+
* @param p4 a plan
5804+
* @param p5 a plan
5805+
* @param p6 a plan
5806+
* @return An observable sequence with the results from matching several patterns
5807+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5808+
*/
5809+
@SuppressWarnings("unchecked")
5810+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6) {
5811+
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6));
5812+
}
5813+
/**
5814+
* Joins together the results from several patterns.
5815+
* @param p1 a plan
5816+
* @param p2 a plan
5817+
* @param p3 a plan
5818+
* @param p4 a plan
5819+
* @param p5 a plan
5820+
* @param p6 a plan
5821+
* @param p7 a plan
5822+
* @return An observable sequence with the results from matching several patterns
5823+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5824+
*/
5825+
@SuppressWarnings("unchecked")
5826+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7) {
5827+
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7));
5828+
}
5829+
/**
5830+
* Joins together the results from several patterns.
5831+
* @param p1 a plan
5832+
* @param p2 a plan
5833+
* @param p3 a plan
5834+
* @param p4 a plan
5835+
* @param p5 a plan
5836+
* @param p6 a plan
5837+
* @param p7 a plan
5838+
* @param p8 a plan
5839+
* @return An observable sequence with the results from matching several patterns
5840+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5841+
*/
5842+
@SuppressWarnings("unchecked")
5843+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8) {
5844+
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8));
5845+
}
5846+
/**
5847+
* Joins together the results from several patterns.
5848+
* @param p1 a plan
5849+
* @param p2 a plan
5850+
* @param p3 a plan
5851+
* @param p4 a plan
5852+
* @param p5 a plan
5853+
* @param p6 a plan
5854+
* @param p7 a plan
5855+
* @param p8 a plan
5856+
* @param p9 a plan
5857+
* @return An observable sequence with the results from matching several patterns
5858+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
5859+
*/
5860+
@SuppressWarnings("unchecked")
5861+
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
5862+
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9));
5863+
}
56935864
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.joins;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
/**
22+
* Represents an activated plan.
23+
*/
24+
public abstract class ActivePlan0 {
25+
protected final Map<JoinObserver, JoinObserver> joinObservers = new HashMap<JoinObserver, JoinObserver>();
26+
27+
public abstract void match();
28+
29+
protected void addJoinObserver(JoinObserver joinObserver) {
30+
joinObservers.put(joinObserver, joinObserver);
31+
}
32+
protected void dequeue() {
33+
for (JoinObserver jo : joinObservers.values()) {
34+
jo.dequeue();
35+
}
36+
}
37+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action1;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan1<T1> extends ActivePlan0 {
26+
private final Action1<T1> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
30+
this.onNext = onNext;
31+
this.onCompleted = onCompleted;
32+
this.first = first;
33+
addJoinObserver(first);
34+
}
35+
36+
@Override
37+
public void match() {
38+
if (!first.queue().isEmpty()) {
39+
Notification<T1> n1 = first.queue().peek();
40+
if (n1.isOnCompleted()) {
41+
onCompleted.call();
42+
} else {
43+
dequeue();
44+
onNext.call(n1.getValue());
45+
}
46+
}
47+
}
48+
49+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action2;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan2<T1, T2> extends ActivePlan0 {
26+
private final Action2<T1, T2> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
31+
this.onNext = onNext;
32+
this.onCompleted = onCompleted;
33+
this.first = first;
34+
this.second = second;
35+
addJoinObserver(first);
36+
addJoinObserver(second);
37+
}
38+
39+
@Override
40+
public void match() {
41+
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
42+
Notification<T1> n1 = first.queue().peek();
43+
Notification<T2> n2 = second.queue().peek();
44+
45+
if (n1.isOnCompleted() || n2.isOnCompleted()) {
46+
onCompleted.call();
47+
} else {
48+
dequeue();
49+
onNext.call(n1.getValue(), n2.getValue());
50+
}
51+
}
52+
}
53+
54+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action3;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
26+
private final Action3<T1, T2, T3> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
private final JoinObserver1<T3> third;
31+
public ActivePlan3(JoinObserver1<T1> first,
32+
JoinObserver1<T2> second,
33+
JoinObserver1<T3> third,
34+
Action3<T1, T2, T3> onNext,
35+
Action0 onCompleted) {
36+
this.onNext = onNext;
37+
this.onCompleted = onCompleted;
38+
this.first = first;
39+
this.second = second;
40+
this.third = third;
41+
addJoinObserver(first);
42+
addJoinObserver(second);
43+
addJoinObserver(third);
44+
}
45+
46+
@Override
47+
public void match() {
48+
if (!first.queue().isEmpty()
49+
&& !second.queue().isEmpty()
50+
&& !third.queue().isEmpty()) {
51+
Notification<T1> n1 = first.queue().peek();
52+
Notification<T2> n2 = second.queue().peek();
53+
Notification<T3> n3 = third.queue().peek();
54+
55+
if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) {
56+
onCompleted.call();
57+
} else {
58+
dequeue();
59+
onNext.call(n1.getValue(), n2.getValue(), n3.getValue());
60+
}
61+
}
62+
}
63+
64+
}

0 commit comments

Comments
 (0)