Skip to content

Commit 1573df9

Browse files
Merge pull request #770 from benjchristensen/bind
Bind Operator
2 parents f17e934 + 47c20f8 commit 1573df9

File tree

58 files changed

+2511
-2163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2511
-2163
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.groovy;
17+
18+
import groovy.lang.Closure;
19+
import rx.Operator;
20+
import rx.Subscription;
21+
import rx.util.functions.Action1;
22+
23+
public class GroovyCreateWrapper<T> implements Action1<Operator<? super T>> {
24+
25+
private final Closure<Void> closure;
26+
27+
public GroovyCreateWrapper(Closure<Void> closure) {
28+
this.closure = closure;
29+
}
30+
31+
@Override
32+
public void call(Operator<? super T> op) {
33+
Object o = closure.call(op);
34+
/*
35+
* If the new signature is being used, we will get NULL back.
36+
* If the old is being used we will get a Subscription back.
37+
*/
38+
if (o != null) {
39+
op.add((Subscription) o);
40+
}
41+
}
42+
43+
}

language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java

+55-7
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,16 @@
1616
package rx.lang.groovy;
1717

1818
import groovy.lang.Closure;
19-
import groovy.lang.GroovySystem;
2019
import groovy.lang.MetaMethod;
2120

2221
import java.lang.reflect.InvocationTargetException;
2322
import java.lang.reflect.Method;
2423
import java.util.ArrayList;
25-
import java.util.HashMap;
2624
import java.util.List;
27-
import java.util.Map;
28-
import java.util.Properties;
2925

3026
import org.codehaus.groovy.reflection.CachedClass;
3127
import org.codehaus.groovy.reflection.ReflectionCache;
3228
import org.codehaus.groovy.runtime.m12n.ExtensionModule;
33-
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;
3429

3530
import rx.Observable;
3631
import rx.Observable.OnSubscribeFunc;
@@ -75,6 +70,9 @@ public List<MetaMethod> getMetaMethods() {
7570
}
7671

7772
private MetaMethod createMetaMethod(final Method m) {
73+
if (m.getDeclaringClass().equals(Observable.class) && m.getName().equals("create")) {
74+
return specialCasedOverrideForCreate(m);
75+
}
7876
return new MetaMethod() {
7977

8078
@Override
@@ -109,12 +107,11 @@ public Object invoke(Object object, Object[] arguments) {
109107
if (o instanceof Closure) {
110108
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
111109
newArgs[i] = new GroovyActionWrapper((Closure) o);
112-
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
110+
} else if (OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
113111
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
114112
} else {
115113
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
116114
}
117-
118115
} else {
119116
newArgs[i] = o;
120117
}
@@ -152,4 +149,55 @@ public CachedClass[] getParameterTypes() {
152149
}
153150
};
154151
}
152+
153+
/**
154+
* Special case until we finish migrating off the deprecated 'create' method signature
155+
*/
156+
private MetaMethod specialCasedOverrideForCreate(final Method m) {
157+
return new MetaMethod() {
158+
159+
@Override
160+
public int getModifiers() {
161+
return m.getModifiers();
162+
}
163+
164+
@Override
165+
public String getName() {
166+
return m.getName();
167+
}
168+
169+
@Override
170+
public Class getReturnType() {
171+
return m.getReturnType();
172+
}
173+
174+
@Override
175+
public CachedClass getDeclaringClass() {
176+
return ReflectionCache.getCachedClass(m.getDeclaringClass());
177+
}
178+
179+
@Override
180+
public Object invoke(Object object, final Object[] arguments) {
181+
return Observable.create(new GroovyCreateWrapper((Closure) arguments[0]));
182+
}
183+
184+
@SuppressWarnings("rawtypes")
185+
@Override
186+
public CachedClass[] getParameterTypes() {
187+
Class[] pts = m.getParameterTypes();
188+
CachedClass[] cc = new CachedClass[pts.length];
189+
for (int i = 0; i < pts.length; i++) {
190+
if (Function.class.isAssignableFrom(pts[i])) {
191+
// function type to be replaced by closure
192+
cc[i] = ReflectionCache.getCachedClass(Closure.class);
193+
} else {
194+
// non-function type
195+
cc[i] = ReflectionCache.getCachedClass(pts[i]);
196+
}
197+
}
198+
return cc;
199+
}
200+
};
201+
}
202+
155203
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,8 @@ trait Observable[+T]
310310
* their index. Indices start at 0.
311311
*/
312312
def zipWithIndex: Observable[(T, Int)] = {
313-
val fScala: (T, Integer) => (T, Int) = (elem: T, index: Integer) => (elem, index)
314-
val fJava : Func2[_ >: T, Integer, _ <: (T, Int)] = fScala
315-
toScalaObservable[(T, Int)](asJavaObservable.mapWithIndex[(T, Int)](fJava))
313+
var n = 0;
314+
this.map(x => { val result = (x,n); n += 1; result })
316315
}
317316

318317
/**

0 commit comments

Comments
 (0)