Skip to content

update rx java to 1.0.4 #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*.iml
*.ipr
*.iws
/.idea

# Build
build
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ install - install any jars produced to the local Maven repository (.m2)

dependencies {

compile "com.netflix.rxjava:rxjava-core:$rxjavaVersion"
compile "com.netflix.rxjava:rxjava-groovy:$rxjavaVersion"
compile "io.reactivex:rxjava:$rxjavaVersion"
compile "com.netflix.rxjava:rxjava-groovy:$rxjavaGroovyVersion"
testCompile files("src/test/conf")

// If you're creating Groovy compiled verticles you may need the following dependencies
Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ toolsVersion=1.0.0-SNAPSHOT
junitVersion=4.11

# The version of RxJava
rxjavaVersion=0.9.0
rxjavaVersion=1.0.4
rxjavaGroovyVersion=0.9.0
20 changes: 10 additions & 10 deletions src/main/java/meez/rxvertx/java/RxSupport.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package meez.rxvertx.java;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.atomic.AtomicLong;

import meez.rxvertx.java.impl.SingleObserverHandler;
import org.codehaus.jackson.map.ObjectMapper;
import org.vertx.java.core.Handler;
Expand All @@ -9,15 +13,11 @@
import org.vertx.java.core.streams.ReadStream;
import org.vertx.java.core.streams.WriteStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.atomic.AtomicLong;

/** RxSupport */
public class RxSupport {
Expand Down Expand Up @@ -47,8 +47,8 @@ public void call(Buffer buffer) {
total.addAndGet(buffer.length());
}
},
new Action1<Exception>() {
public void call(Exception e) {
new Action1<Throwable>() {
public void call(Throwable e) {
rx.onError(e);
}
},
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/meez/rxvertx/java/http/RxHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.vertx.java.core.Handler;
import org.vertx.java.core.http.*;
import rx.Observable;
import rx.util.functions.Action1;
import rx.functions.Action1;

/** RxWrapper for HttpClient
* @author <a href="http://github.com/petermd">Peter McDonnell</a>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/meez/rxvertx/java/http/RxHttpSupport.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package meez.rxvertx.java.http;

import java.io.UnsupportedEncodingException;

import meez.rxvertx.java.RxException;
import meez.rxvertx.java.RxSupport;
import org.vertx.java.core.buffer.Buffer;
Expand All @@ -8,10 +10,8 @@
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonObject;
import rx.Observable;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

import java.io.UnsupportedEncodingException;
import rx.functions.Action1;
import rx.functions.Func1;

/** Utility methods for RxHttpXXX */
public class RxHttpSupport {
Expand Down
65 changes: 34 additions & 31 deletions src/main/java/meez/rxvertx/java/impl/MemoizeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
import java.util.concurrent.atomic.AtomicReference;

import org.vertx.java.core.Handler;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/** Subject that stores the result of a Handler and notfies all current and future Observers */
public class MemoizeHandler<R,T> implements Handler<T> {

/** States */
enum State { ACTIVE, COMPLETED, FAILED };
enum State { ACTIVE, COMPLETED, FAILED }

/** State */
private State state;
Expand All @@ -25,7 +25,7 @@ enum State { ACTIVE, COMPLETED, FAILED };
private Exception error;

/** Reference to active observer */
private AtomicReference<Observer<R>> obRef=new AtomicReference<Observer<R>>();
private AtomicReference<Subscriber<?super R>> obRef=new AtomicReference<Subscriber<? super R>>();

/** Create new MemoizeHandler */
public MemoizeHandler() {
Expand All @@ -35,34 +35,37 @@ public MemoizeHandler() {
}

/** Subscription function */
public Func1<Observer<R>, Subscription> subscribe=new Func1<Observer<R>, Subscription>() {
public Subscription call(Observer<R> newObserver) {
// Check if complete
switch(state) {
public Observable.OnSubscribe<R> subscribe = new Observable.OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> newObserver) {
// Check if complete
switch(state) {

// Completed. Forward the saved result
case COMPLETED:
dispatchResult(newObserver,result);
return Subscriptions.empty();

// Failed already. Forward the saved error
case FAILED:
dispatchError(newObserver,error);
return Subscriptions.empty();
}

// State=ACTIVE
if (!obRef.compareAndSet(null, newObserver))
throw new IllegalStateException("Cannot have multiple subscriptions");

return Subscriptions.create(unsubscribe);
}
};
// Completed. Forward the saved result
case COMPLETED:
dispatchResult(newObserver,result);
newObserver.add(Subscriptions.empty());
return;

// Failed already. Forward the saved error
case FAILED:
dispatchError(newObserver,error);
newObserver.add(Subscriptions.empty());
return;
}

// State=ACTIVE
if (!obRef.compareAndSet(null, newObserver))
throw new IllegalStateException("Cannot have multiple subscriptions");

newObserver.add(Subscriptions.create(unsubscribe));
}
};

/** Unsubscribe action */
public Action0 unsubscribe=new Action0() {
public void call() {
Observer<R> ob=obRef.getAndSet(null);
Subscriber<? super R> ob=obRef.getAndSet(null);
if (ob==null)
throw new IllegalStateException("Unsubscribe without subscribe");
}
Expand All @@ -73,7 +76,7 @@ public void complete(R value) {
this.result=value;
this.state=State.COMPLETED;

Observer<R> ob=obRef.get();
Subscriber<? super R> ob=obRef.get();
// Ignore if no active observer
if (ob==null)
return;
Expand All @@ -86,7 +89,7 @@ public void fail(Exception e) {
this.error=e;
this.state=State.FAILED;

Observer<R> ob=obRef.get();
Subscriber<? super R> ob=obRef.get();
// Ignore if no active observer
if (ob==null)
return;
Expand All @@ -106,7 +109,7 @@ public void handle(T value) {
// Implementation

/** Dispatch result */
private void dispatchResult(Observer<R> ob, R value) {
private void dispatchResult(Subscriber<? super R> ob, R value) {
try {
ob.onNext(value);
}
Expand Down
33 changes: 17 additions & 16 deletions src/main/java/meez/rxvertx/java/impl/SingleObserverHandler.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package meez.rxvertx.java.impl;

import java.util.concurrent.atomic.AtomicReference;

import org.vertx.java.core.Handler;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

import java.util.concurrent.atomic.AtomicReference;

/** Mapping from Handler to Observer that supports a single subscription and wrapping of the response object.
*
Expand All @@ -18,22 +18,23 @@
public abstract class SingleObserverHandler<R, T> implements Handler<T> {

/** Reference to active observer */
private AtomicReference<Observer<R>> obRef=new AtomicReference<Observer<R>>();
private AtomicReference<Subscriber<? super R>> obRef=new AtomicReference<Subscriber<? super R>>();

/** Subscription function */
public Func1<Observer<R>, Subscription> subscribe=new Func1<Observer<R>, Subscription>() {
public Subscription call(Observer<R> newObserver) {
if (!obRef.compareAndSet(null, newObserver))
throw new IllegalStateException("Cannot have multiple subscriptions");
register();
return Subscriptions.create(unsubscribe);
public Observable.OnSubscribe<R> subscribe = new Observable.OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> subscriber) {
if (!obRef.compareAndSet(null, subscriber))
throw new IllegalStateException("Cannot have multiple subscriptions");
register();
subscriber.add(Subscriptions.create(unsubscribe));
}
};

/** Unsubscribe action */
public Action0 unsubscribe = new Action0() {
public void call() {
Observer<R> ob=obRef.getAndSet(null);
Observer<? super R> ob=obRef.getAndSet(null);
if (ob==null)
throw new IllegalStateException("Unsubscribe without subscribe");
clear();
Expand All @@ -58,7 +59,7 @@ public R wrap(T value) {
/** Complete the handler */
public void complete() {

Observer<R> ob=obRef.get();
Observer<? super R> ob=obRef.get();
// Ignore if no active observer
if (ob==null)
return;
Expand All @@ -72,7 +73,7 @@ public void complete() {
/** Fail the handler - used to handle errors before the handler is called */
public void fail(Exception e) {

Observer<R> ob=obRef.get();
Observer<? super R> ob=obRef.get();
// Ignore if no active observer
if (ob==null)
return;
Expand All @@ -87,7 +88,7 @@ public void fail(Exception e) {

public void handle(T value) {

Observer<R> ob=obRef.get();
Observer<? super R> ob=obRef.get();
// Ignore if no active observer
if (ob==null)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/meez/rxvertx/java/io/RxFileSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.file.AsyncFile;
import rx.Observable;
import rx.util.functions.Func1;
import rx.functions.Func1;

/** RxFileSupport utility methods */
public class RxFileSupport {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/meez/rxvertx/java/io/RxFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public RxFileSystem(FileSystem fileSystem) {
public Observable<Void> copyRx(String s, String s1) {
AsyncResultMemoizeHandler<Void> rh=new AsyncResultMemoizeHandler<Void>();
nested.copy(s,s1,rh);
return Observable.create(rh);
return Observable.create(rh.subscribe);
}

public Observable<Void> copyRx(String s, String s1, boolean b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import rx.Observable;
import rx.util.functions.Action1;
import rx.functions.Action1;

/** Pipeline for handling BusMod requests */
public class EventBusPipeline<T> extends HandlerPipeline<Message<T>,Message<T>,T> {
Expand Down Expand Up @@ -46,9 +46,9 @@ public void call(T value) {
}

/** Return error renderer */
public Action1<Exception> renderError(final Message<T> src) {
return new Action1<Exception>() {
public void call(Exception e) {
public Action1<Throwable> renderError(final Message<T> src) {
return new Action1<Throwable>() {
public void call(Throwable e) {
// There is no standard way to send errors to non-JsonObject messages. Log and ignore for now
log.warn("EventBus handler failed (req="+src+")",e);
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/meez/rxvertx/java/pipeline/HttpServerPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonObject;
import rx.Observable;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.functions.Action0;
import rx.functions.Action1;

/** Pipeline for handling HttpServer requests */
public class HttpServerPipeline<T> extends HandlerPipeline<RxHttpServerRequest, HttpServerRequest, T> {
Expand Down Expand Up @@ -55,9 +55,9 @@ else if (o instanceof JsonObject) {
}

/** Return error renderer */
public Action1<Exception> renderError(final HttpServerRequest src) {
return new Action1<Exception>() {
public void call(Exception e) {
public Action1<Throwable> renderError(final HttpServerRequest src) {
return new Action1<Throwable>() {
public void call(Throwable e) {
src.response.statusCode=500;
src.response.statusMessage="Request failed: "+e;
src.response.putHeader("Content-type","text/plain");
Expand Down
Loading