diff --git a/.gitignore b/.gitignore index 1ee9075..d8a4c29 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ *.iml *.ipr *.iws +/.idea # Build build diff --git a/build.gradle b/build.gradle index cf268dd..511582c 100644 --- a/build.gradle +++ b/build.gradle @@ -44,8 +44,8 @@ install - install any jars produced to the local Maven repository (.m2) dependencies { - compile "com.netflix.rxjava:rxjava-core:$rxjavaVersion" - compile "com.netflix.rxjava:rxjava-groovy:$rxjavaVersion" + compile "io.reactivex:rxjava:$rxjavaVersion" + compile "com.netflix.rxjava:rxjava-groovy:$rxjavaGroovyVersion" testCompile files("src/test/conf") // If you're creating Groovy compiled verticles you may need the following dependencies diff --git a/gradle.properties b/gradle.properties index 42f6adf..8876f1d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -32,4 +32,5 @@ toolsVersion=1.0.0-SNAPSHOT junitVersion=4.11 # The version of RxJava -rxjavaVersion=0.9.0 +rxjavaVersion=1.0.4 +rxjavaGroovyVersion=0.9.0 diff --git a/src/main/java/meez/rxvertx/java/RxSupport.java b/src/main/java/meez/rxvertx/java/RxSupport.java index b9312c3..82b8c40 100644 --- a/src/main/java/meez/rxvertx/java/RxSupport.java +++ b/src/main/java/meez/rxvertx/java/RxSupport.java @@ -1,5 +1,9 @@ package meez.rxvertx.java; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.concurrent.atomic.AtomicLong; + import meez.rxvertx.java.impl.SingleObserverHandler; import org.codehaus.jackson.map.ObjectMapper; import org.vertx.java.core.Handler; @@ -9,15 +13,11 @@ import org.vertx.java.core.streams.ReadStream; import org.vertx.java.core.streams.WriteStream; import rx.Observable; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; import rx.subjects.PublishSubject; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func1; -import rx.util.functions.Func2; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.concurrent.atomic.AtomicLong; /** RxSupport */ public class RxSupport { @@ -47,8 +47,8 @@ public void call(Buffer buffer) { total.addAndGet(buffer.length()); } }, - new Action1() { - public void call(Exception e) { + new Action1() { + public void call(Throwable e) { rx.onError(e); } }, diff --git a/src/main/java/meez/rxvertx/java/http/RxHttpClient.java b/src/main/java/meez/rxvertx/java/http/RxHttpClient.java index e79df56..da601c0 100644 --- a/src/main/java/meez/rxvertx/java/http/RxHttpClient.java +++ b/src/main/java/meez/rxvertx/java/http/RxHttpClient.java @@ -7,7 +7,7 @@ import org.vertx.java.core.Handler; import org.vertx.java.core.http.*; import rx.Observable; -import rx.util.functions.Action1; +import rx.functions.Action1; /** RxWrapper for HttpClient * @author Peter McDonnell diff --git a/src/main/java/meez/rxvertx/java/http/RxHttpSupport.java b/src/main/java/meez/rxvertx/java/http/RxHttpSupport.java index d38a42c..d52cc14 100644 --- a/src/main/java/meez/rxvertx/java/http/RxHttpSupport.java +++ b/src/main/java/meez/rxvertx/java/http/RxHttpSupport.java @@ -1,5 +1,7 @@ package meez.rxvertx.java.http; +import java.io.UnsupportedEncodingException; + import meez.rxvertx.java.RxException; import meez.rxvertx.java.RxSupport; import org.vertx.java.core.buffer.Buffer; @@ -8,10 +10,8 @@ import org.vertx.java.core.http.HttpServerRequest; import org.vertx.java.core.json.JsonObject; import rx.Observable; -import rx.util.functions.Action1; -import rx.util.functions.Func1; - -import java.io.UnsupportedEncodingException; +import rx.functions.Action1; +import rx.functions.Func1; /** Utility methods for RxHttpXXX */ public class RxHttpSupport { diff --git a/src/main/java/meez/rxvertx/java/impl/MemoizeHandler.java b/src/main/java/meez/rxvertx/java/impl/MemoizeHandler.java index 1da4471..6902e8a 100644 --- a/src/main/java/meez/rxvertx/java/impl/MemoizeHandler.java +++ b/src/main/java/meez/rxvertx/java/impl/MemoizeHandler.java @@ -3,17 +3,17 @@ import java.util.concurrent.atomic.AtomicReference; import org.vertx.java.core.Handler; +import rx.Observable; import rx.Observer; -import rx.Subscription; +import rx.Subscriber; +import rx.functions.Action0; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Func1; /** Subject that stores the result of a Handler and notfies all current and future Observers */ public class MemoizeHandler implements Handler { /** States */ - enum State { ACTIVE, COMPLETED, FAILED }; + enum State { ACTIVE, COMPLETED, FAILED } /** State */ private State state; @@ -25,7 +25,7 @@ enum State { ACTIVE, COMPLETED, FAILED }; private Exception error; /** Reference to active observer */ - private AtomicReference> obRef=new AtomicReference>(); + private AtomicReference> obRef=new AtomicReference>(); /** Create new MemoizeHandler */ public MemoizeHandler() { @@ -35,34 +35,37 @@ public MemoizeHandler() { } /** Subscription function */ - public Func1, Subscription> subscribe=new Func1, Subscription>() { - public Subscription call(Observer newObserver) { - // Check if complete - switch(state) { + public Observable.OnSubscribe subscribe = new Observable.OnSubscribe() { + @Override + public void call(Subscriber newObserver) { + // Check if complete + switch(state) { - // Completed. Forward the saved result - case COMPLETED: - dispatchResult(newObserver,result); - return Subscriptions.empty(); - - // Failed already. Forward the saved error - case FAILED: - dispatchError(newObserver,error); - return Subscriptions.empty(); - } - - // State=ACTIVE - if (!obRef.compareAndSet(null, newObserver)) - throw new IllegalStateException("Cannot have multiple subscriptions"); - - return Subscriptions.create(unsubscribe); - } - }; + // Completed. Forward the saved result + case COMPLETED: + dispatchResult(newObserver,result); + newObserver.add(Subscriptions.empty()); + return; + + // Failed already. Forward the saved error + case FAILED: + dispatchError(newObserver,error); + newObserver.add(Subscriptions.empty()); + return; + } + + // State=ACTIVE + if (!obRef.compareAndSet(null, newObserver)) + throw new IllegalStateException("Cannot have multiple subscriptions"); + + newObserver.add(Subscriptions.create(unsubscribe)); + } + }; /** Unsubscribe action */ public Action0 unsubscribe=new Action0() { public void call() { - Observer ob=obRef.getAndSet(null); + Subscriber ob=obRef.getAndSet(null); if (ob==null) throw new IllegalStateException("Unsubscribe without subscribe"); } @@ -73,7 +76,7 @@ public void complete(R value) { this.result=value; this.state=State.COMPLETED; - Observer ob=obRef.get(); + Subscriber ob=obRef.get(); // Ignore if no active observer if (ob==null) return; @@ -86,7 +89,7 @@ public void fail(Exception e) { this.error=e; this.state=State.FAILED; - Observer ob=obRef.get(); + Subscriber ob=obRef.get(); // Ignore if no active observer if (ob==null) return; @@ -106,7 +109,7 @@ public void handle(T value) { // Implementation /** Dispatch result */ - private void dispatchResult(Observer ob, R value) { + private void dispatchResult(Subscriber ob, R value) { try { ob.onNext(value); } diff --git a/src/main/java/meez/rxvertx/java/impl/SingleObserverHandler.java b/src/main/java/meez/rxvertx/java/impl/SingleObserverHandler.java index f08868e..65f7064 100644 --- a/src/main/java/meez/rxvertx/java/impl/SingleObserverHandler.java +++ b/src/main/java/meez/rxvertx/java/impl/SingleObserverHandler.java @@ -1,13 +1,13 @@ package meez.rxvertx.java.impl; +import java.util.concurrent.atomic.AtomicReference; + import org.vertx.java.core.Handler; +import rx.Observable; import rx.Observer; -import rx.Subscription; +import rx.Subscriber; +import rx.functions.Action0; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Func1; - -import java.util.concurrent.atomic.AtomicReference; /** Mapping from Handler to Observer that supports a single subscription and wrapping of the response object. * @@ -18,22 +18,23 @@ public abstract class SingleObserverHandler implements Handler { /** Reference to active observer */ - private AtomicReference> obRef=new AtomicReference>(); + private AtomicReference> obRef=new AtomicReference>(); /** Subscription function */ - public Func1, Subscription> subscribe=new Func1, Subscription>() { - public Subscription call(Observer newObserver) { - if (!obRef.compareAndSet(null, newObserver)) - throw new IllegalStateException("Cannot have multiple subscriptions"); - register(); - return Subscriptions.create(unsubscribe); + public Observable.OnSubscribe subscribe = new Observable.OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + if (!obRef.compareAndSet(null, subscriber)) + throw new IllegalStateException("Cannot have multiple subscriptions"); + register(); + subscriber.add(Subscriptions.create(unsubscribe)); } }; /** Unsubscribe action */ public Action0 unsubscribe = new Action0() { public void call() { - Observer ob=obRef.getAndSet(null); + Observer ob=obRef.getAndSet(null); if (ob==null) throw new IllegalStateException("Unsubscribe without subscribe"); clear(); @@ -58,7 +59,7 @@ public R wrap(T value) { /** Complete the handler */ public void complete() { - Observer ob=obRef.get(); + Observer ob=obRef.get(); // Ignore if no active observer if (ob==null) return; @@ -72,7 +73,7 @@ public void complete() { /** Fail the handler - used to handle errors before the handler is called */ public void fail(Exception e) { - Observer ob=obRef.get(); + Observer ob=obRef.get(); // Ignore if no active observer if (ob==null) return; @@ -87,7 +88,7 @@ public void fail(Exception e) { public void handle(T value) { - Observer ob=obRef.get(); + Observer ob=obRef.get(); // Ignore if no active observer if (ob==null) return; diff --git a/src/main/java/meez/rxvertx/java/io/RxFileSupport.java b/src/main/java/meez/rxvertx/java/io/RxFileSupport.java index 026d2b8..050ef60 100644 --- a/src/main/java/meez/rxvertx/java/io/RxFileSupport.java +++ b/src/main/java/meez/rxvertx/java/io/RxFileSupport.java @@ -5,7 +5,7 @@ import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.file.AsyncFile; import rx.Observable; -import rx.util.functions.Func1; +import rx.functions.Func1; /** RxFileSupport utility methods */ public class RxFileSupport { diff --git a/src/main/java/meez/rxvertx/java/io/RxFileSystem.java b/src/main/java/meez/rxvertx/java/io/RxFileSystem.java index fd101a5..94d9da3 100644 --- a/src/main/java/meez/rxvertx/java/io/RxFileSystem.java +++ b/src/main/java/meez/rxvertx/java/io/RxFileSystem.java @@ -25,7 +25,7 @@ public RxFileSystem(FileSystem fileSystem) { public Observable copyRx(String s, String s1) { AsyncResultMemoizeHandler rh=new AsyncResultMemoizeHandler(); nested.copy(s,s1,rh); - return Observable.create(rh); + return Observable.create(rh.subscribe); } public Observable copyRx(String s, String s1, boolean b) { diff --git a/src/main/java/meez/rxvertx/java/pipeline/EventBusPipeline.java b/src/main/java/meez/rxvertx/java/pipeline/EventBusPipeline.java index 80fd4c3..cf68bfc 100644 --- a/src/main/java/meez/rxvertx/java/pipeline/EventBusPipeline.java +++ b/src/main/java/meez/rxvertx/java/pipeline/EventBusPipeline.java @@ -5,7 +5,7 @@ import org.vertx.java.core.logging.Logger; import org.vertx.java.core.logging.impl.LoggerFactory; import rx.Observable; -import rx.util.functions.Action1; +import rx.functions.Action1; /** Pipeline for handling BusMod requests */ public class EventBusPipeline extends HandlerPipeline,Message,T> { @@ -46,9 +46,9 @@ public void call(T value) { } /** Return error renderer */ - public Action1 renderError(final Message src) { - return new Action1() { - public void call(Exception e) { + public Action1 renderError(final Message src) { + return new Action1() { + public void call(Throwable e) { // There is no standard way to send errors to non-JsonObject messages. Log and ignore for now log.warn("EventBus handler failed (req="+src+")",e); } diff --git a/src/main/java/meez/rxvertx/java/pipeline/HttpServerPipeline.java b/src/main/java/meez/rxvertx/java/pipeline/HttpServerPipeline.java index ddf9ce4..a582ffa 100644 --- a/src/main/java/meez/rxvertx/java/pipeline/HttpServerPipeline.java +++ b/src/main/java/meez/rxvertx/java/pipeline/HttpServerPipeline.java @@ -6,8 +6,8 @@ import org.vertx.java.core.http.HttpServerRequest; import org.vertx.java.core.json.JsonObject; import rx.Observable; -import rx.util.functions.Action0; -import rx.util.functions.Action1; +import rx.functions.Action0; +import rx.functions.Action1; /** Pipeline for handling HttpServer requests */ public class HttpServerPipeline extends HandlerPipeline { @@ -55,9 +55,9 @@ else if (o instanceof JsonObject) { } /** Return error renderer */ - public Action1 renderError(final HttpServerRequest src) { - return new Action1() { - public void call(Exception e) { + public Action1 renderError(final HttpServerRequest src) { + return new Action1() { + public void call(Throwable e) { src.response.statusCode=500; src.response.statusMessage="Request failed: "+e; src.response.putHeader("Content-type","text/plain"); diff --git a/src/test/java/meez/rxvertx/java/RxTestSupport.java b/src/test/java/meez/rxvertx/java/RxTestSupport.java index 5212244..75e9909 100644 --- a/src/test/java/meez/rxvertx/java/RxTestSupport.java +++ b/src/test/java/meez/rxvertx/java/RxTestSupport.java @@ -5,7 +5,9 @@ import org.vertx.java.testframework.TestUtils; import rx.Observer; -import rx.util.functions.*; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; /** RxTestSupport */ public class RxTestSupport { @@ -105,9 +107,9 @@ public void call(T t) { }; } - public static Action1 testFailed(final TestUtils tu) { - return new Action1() { - public void call(Exception e) { + public static Action1 testFailed(final TestUtils tu) { + return new Action1() { + public void call(Throwable e) { System.err.println("Test failed: "+e); e.printStackTrace(System.err); tu.azzert(false); @@ -150,7 +152,7 @@ public void onCompleted() { // Call max once idx++; } - public void onError(Exception e) { + public void onError(Throwable e) { // Call never tu.azzert(false,"Unexpected error (e="+e+")"); latch.countDown(); @@ -172,7 +174,7 @@ public void onCompleted() { tu.azzert(count==1,"onCompleted() with no value"); latch.countDown(); } - public void onError(Exception e) { + public void onError(Throwable e) { tu.azzert(false,"Unexpected error (e="+e+")"); latch.countDown(); } @@ -189,7 +191,7 @@ public void onCompleted() { tu.azzert(false,"Error expected"); } @SuppressWarnings("unchecked") - public void onError(Exception e) { + public void onError(Throwable e) { tu.azzert(errType.isAssignableFrom(e.getClass())); System.out.println("Expected error: "+e); tu.testComplete(); diff --git a/src/test/java/meez/rxvertx/java/impl/TestResultHandler.java b/src/test/java/meez/rxvertx/java/impl/TestResultHandler.java index 57c71a6..dc663c0 100644 --- a/src/test/java/meez/rxvertx/java/impl/TestResultHandler.java +++ b/src/test/java/meez/rxvertx/java/impl/TestResultHandler.java @@ -1,18 +1,18 @@ package meez.rxvertx.java.impl; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + import meez.rxvertx.java.RxTestSupport; import org.junit.Test; import org.vertx.java.core.Handler; import org.vertx.java.testframework.TestBase; import org.vertx.java.testframework.TestUtils; import rx.Observable; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func1; -import rx.util.functions.Func2; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; /** * TestResultHandler @@ -126,27 +126,27 @@ public void testCascading() throws InterruptedException { Observable txa= a1 - .mapMany(new Func1>() { - public Observable call(Long aLong) { - System.out.println("a1 -> start a2"); - ResultMemoizeHandler a2h=new ResultMemoizeHandler(); - Observable a2=Observable.create(a2h.subscribe); - vertx.setTimer(100,a2h); - return a2; - } - }); + .flatMap(new Func1>() { + public Observable call(Long aLong) { + System.out.println("a1 -> start a2"); + ResultMemoizeHandler a2h = new ResultMemoizeHandler(); + Observable a2 = Observable.create(a2h.subscribe); + vertx.setTimer(100, a2h); + return a2; + } + }); Observable txb= b1 - .mapMany(new Func1>() { - public Observable call(Long aLong) { - System.out.println("b1 -> start b2"); - ResultMemoizeHandler b2h=new ResultMemoizeHandler(); - Observable b2=Observable.create(b2h.subscribe); - vertx.setTimer(100,b2h); - return b2; - } - }); + .flatMap(new Func1>() { + public Observable call(Long aLong) { + System.out.println("b1 -> start b2"); + ResultMemoizeHandler b2h = new ResultMemoizeHandler(); + Observable b2 = Observable.create(b2h.subscribe); + vertx.setTimer(100, b2h); + return b2; + } + }); // Subscribe to both //txa.subscribe(RxTestSupport.traceValue("txa")); diff --git a/src/test/java/vertx/tests/rxjava/EventBus.java b/src/test/java/vertx/tests/rxjava/EventBus.java index f93abe3..298f0a3 100644 --- a/src/test/java/vertx/tests/rxjava/EventBus.java +++ b/src/test/java/vertx/tests/rxjava/EventBus.java @@ -1,8 +1,10 @@ package vertx.tests.rxjava; +import java.util.LinkedList; +import java.util.List; + import meez.rxvertx.java.RxEventBus; import meez.rxvertx.java.RxTestSupport; -import static meez.rxvertx.java.RxTestSupport.*; import meez.rxvertx.java.RxVertx; import meez.rxvertx.java.pipeline.EventBusPipeline; import org.vertx.java.core.buffer.Buffer; @@ -11,13 +13,11 @@ import org.vertx.java.core.json.JsonObject; import org.vertx.java.testframework.TestClientBase; import rx.Observable; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func1; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; +import static meez.rxvertx.java.RxTestSupport.toList; /** EventBus */ public class EventBus extends TestClientBase { @@ -56,8 +56,8 @@ public void call(Object v) { } }, // onError - new Action1() { - public void call(Exception e) { + new Action1() { + public void call(Throwable e) { System.err.println("Pipeline failed "+e); e.printStackTrace(System.err); tu.azzert(false); @@ -90,9 +90,9 @@ protected void azzertEquals(Object a, Object b) { } } - protected Action1> checkLoopBack(final T expected) { - return new Action1>() { - public void call(Message in) { + protected Action1> checkLoopBack(final T expected) { + return new Action1>() { + public void call(Message in) { System.out.println("loopback("+expected+")="+in.body); azzertEquals(expected,in.body); } @@ -187,7 +187,7 @@ public void testSendRx() throws Exception { typeByte), completeTest()); - eb.sendRx("sendrx",typeString).subscribe(checkLoopBack(typeJsonObject)); + eb.sendRx("sendrx",typeString).subscribe(checkLoopBack(typeString)); eb.sendRx("sendrx",typeJsonObject).subscribe(checkLoopBack(typeJsonObject)); eb.sendRx("sendrx",typeJsonArray).subscribe(checkLoopBack(typeJsonArray)); eb.sendRx("sendrx",typeBuffer).subscribe(checkLoopBack(typeBuffer)); @@ -237,12 +237,12 @@ public JsonObject call(JsonObject in) { // Send ping and wait for pong eb.sendRx("/test/ping",new JsonObject().putString("msg","ping")) - .map(RxTestSupport.traceMap("pipeline:pong")) + .map(RxTestSupport.>traceMap("pipeline:pong")) .subscribe(new Action1>() { - public void call(Message in) { - tu.azzert("pong".equals(in.body.getString("msg"))); - tu.testComplete(); - } - }); + public void call(Message in) { + tu.azzert("pong".equals(in.body.getString("msg"))); + tu.testComplete(); + } + }); } } diff --git a/src/test/java/vertx/tests/rxjava/FileSystem.java b/src/test/java/vertx/tests/rxjava/FileSystem.java index f543550..bd69813 100644 --- a/src/test/java/vertx/tests/rxjava/FileSystem.java +++ b/src/test/java/vertx/tests/rxjava/FileSystem.java @@ -1,17 +1,5 @@ package vertx.tests.rxjava; -import meez.rxvertx.java.RxSupport; -import meez.rxvertx.java.RxVertx; -import meez.rxvertx.java.io.RxFileSupport; -import org.vertx.java.core.buffer.Buffer; -import org.vertx.java.core.file.AsyncFile; -import org.vertx.java.core.impl.Windows; -import org.vertx.java.testframework.TestClientBase; -import org.vertx.java.testframework.TestUtils; -import rx.subjects.PublishSubject; -import rx.util.functions.Action0; -import rx.util.functions.Action1; - import java.io.File; import java.io.IOException; import java.nio.file.FileSystems; @@ -22,6 +10,18 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; +import meez.rxvertx.java.RxSupport; +import meez.rxvertx.java.RxVertx; +import meez.rxvertx.java.io.RxFileSupport; +import org.vertx.java.core.buffer.Buffer; +import org.vertx.java.core.file.AsyncFile; +import org.vertx.java.core.impl.Windows; +import org.vertx.java.testframework.TestClientBase; +import org.vertx.java.testframework.TestUtils; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.subjects.PublishSubject; + /** FileSystem */ public class FileSystem extends TestClientBase { @@ -52,8 +52,8 @@ public void stop() { super.stop(); } - public Action1 onTestFailed=new Action1() { - public void call(Exception e) { + public Action1 onTestFailed=new Action1() { + public void call(Throwable e) { System.err.println("Test failure (e="+e+")"); e.printStackTrace(System.err); tu.exception(e,"test failed"); diff --git a/src/test/java/vertx/tests/rxjava/HttpTestClient.java b/src/test/java/vertx/tests/rxjava/HttpTestClient.java index 7cb35d4..32307fa 100644 --- a/src/test/java/vertx/tests/rxjava/HttpTestClient.java +++ b/src/test/java/vertx/tests/rxjava/HttpTestClient.java @@ -6,13 +6,16 @@ import meez.rxvertx.java.RxTestSupport; import meez.rxvertx.java.RxTimer; -import meez.rxvertx.java.http.*; +import meez.rxvertx.java.http.RxHttpClient; +import meez.rxvertx.java.http.RxHttpServer; +import meez.rxvertx.java.http.RxHttpServerRequest; +import meez.rxvertx.java.http.RxHttpSupport; import meez.rxvertx.java.pipeline.HttpServerPipeline; import org.vertx.java.core.json.JsonObject; import org.vertx.java.testframework.TestClientBase; import rx.Observable; -import rx.util.functions.Action1; -import rx.util.functions.Func2; +import rx.functions.Action1; +import rx.functions.Func2; /** HttpTestClient */ public class HttpTestClient extends TestClientBase { diff --git a/src/test/java/vertx/tests/rxjava/JsonServer.java b/src/test/java/vertx/tests/rxjava/JsonServer.java index 4fb8049..f025035 100644 --- a/src/test/java/vertx/tests/rxjava/JsonServer.java +++ b/src/test/java/vertx/tests/rxjava/JsonServer.java @@ -10,7 +10,7 @@ import org.vertx.java.deploy.Verticle; import org.vertx.java.testframework.TestUtils; import rx.Observable; -import rx.util.functions.Func1; +import rx.functions.Func1; /** JsonServer */ public class JsonServer extends Verticle { diff --git a/src/test/java/vertx/tests/rxjava/NetTestClient.java b/src/test/java/vertx/tests/rxjava/NetTestClient.java index 6ef8b87..47e1f08 100644 --- a/src/test/java/vertx/tests/rxjava/NetTestClient.java +++ b/src/test/java/vertx/tests/rxjava/NetTestClient.java @@ -1,5 +1,7 @@ package vertx.tests.rxjava; +import java.io.UnsupportedEncodingException; + import meez.rxvertx.java.RxSupport; import meez.rxvertx.java.RxTestSupport; import meez.rxvertx.java.net.RxNetClient; @@ -7,9 +9,7 @@ import meez.rxvertx.java.net.RxNetSocket; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.testframework.TestClientBase; -import rx.util.functions.Action1; - -import java.io.UnsupportedEncodingException; +import rx.functions.Action1; /** * NetTestClient diff --git a/src/test/java/vertx/tests/rxjava/ProxyServer.java b/src/test/java/vertx/tests/rxjava/ProxyServer.java index 7a9ab0e..1ac973b 100644 --- a/src/test/java/vertx/tests/rxjava/ProxyServer.java +++ b/src/test/java/vertx/tests/rxjava/ProxyServer.java @@ -10,7 +10,7 @@ import org.vertx.java.deploy.Verticle; import org.vertx.java.testframework.TestUtils; import rx.Observable; -import rx.util.functions.Func2; +import rx.functions.Func2; /** Simple ProxyServer that turns a single request into two remote requests and merges the result */ public class ProxyServer extends Verticle { diff --git a/src/test/java/vertx/tests/rxjava/Timer.java b/src/test/java/vertx/tests/rxjava/Timer.java index dd326af..2e43d11 100644 --- a/src/test/java/vertx/tests/rxjava/Timer.java +++ b/src/test/java/vertx/tests/rxjava/Timer.java @@ -1,12 +1,12 @@ package vertx.tests.rxjava; -import meez.rxvertx.java.RxTimer; -import org.vertx.java.testframework.TestClientBase; -import rx.util.functions.Action1; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import meez.rxvertx.java.RxTimer; +import org.vertx.java.testframework.TestClientBase; +import rx.functions.Action1; + import static meez.rxvertx.java.RxTestSupport.*; /**