Skip to content

Implemented 'Synchronize' with 'lock' #409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 25, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,27 @@ public Observable<T> synchronize() {
return create(OperationSynchronize.synchronize(this));
}

/**
* Accepts an Observable and wraps it in another Observable that ensures that the resulting
* Observable is chronologically well-behaved. This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/synchronize.png">
* <p>
* A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
*
* @param lock
* The lock object to synchronize each observer call on
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
public Observable<T> synchronize(Object lock) {
return create(OperationSynchronize.synchronize(this, lock));
}

/**
* @deprecated Replaced with instance method.
*/
Expand Down
31 changes: 28 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationSynchronize.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,46 @@ public final class OperationSynchronize<T> {
* @return the wrapped synchronized observable sequence
*/
public static <T> OnSubscribeFunc<T> synchronize(Observable<? extends T> observable) {
return new Synchronize<T>(observable);
return new Synchronize<T>(observable, null);
}

/**
* Accepts an observable and wraps it in another observable which ensures that the resulting observable is well-behaved.
* This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter.
*
* A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are
* not interleaved, onCompleted and onError are only called once respectively, and no
* onNext calls follow onCompleted and onError calls.
*
* @param observable
* @param lock
* The lock object to synchronize each observer call on
* @param <T>
* @return the wrapped synchronized observable sequence
*/
public static <T> OnSubscribeFunc<T> synchronize(Observable<? extends T> observable, Object lock) {
return new Synchronize<T>(observable, lock);
}

private static class Synchronize<T> implements OnSubscribeFunc<T> {

public Synchronize(Observable<? extends T> innerObservable) {
public Synchronize(Observable<? extends T> innerObservable, Object lock) {
this.innerObservable = innerObservable;
this.lock = lock;
}

private Observable<? extends T> innerObservable;
private SynchronizedObserver<T> atomicObserver;
private Object lock;

public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
if(lock == null) {
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
}
else {
atomicObserver = new SynchronizedObserver<T>(observer, subscription, lock);
}
return subscription.wrap(innerObservable.subscribe(atomicObserver));
}

Expand Down
230 changes: 225 additions & 5 deletions rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -68,10 +69,18 @@ public final class SynchronizedObserver<T> implements Observer<T> {
private final SafeObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;
private volatile Object lock;

public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscription subscription) {
this.observer = Observer;
this.subscription = subscription;
this.lock = this;
}

public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscription subscription, Object lock) {
this.observer = Observer;
this.subscription = subscription;
this.lock = lock;
}

/**
Expand All @@ -80,16 +89,15 @@ public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscrip
* @param Observer
*/
public SynchronizedObserver(Observer<? super T> Observer) {
this.observer = Observer;
this.subscription = new SafeObservableSubscription();
this(Observer, new SafeObservableSubscription());
}

public void onNext(T arg) {
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
synchronized (this) {
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
Expand All @@ -105,7 +113,7 @@ public void onError(Throwable e) {
return;
}
finishRequested = true;
synchronized (this) {
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
Expand All @@ -121,7 +129,7 @@ public void onCompleted() {
return;
}
finishRequested = true;
synchronized (this) {
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
Expand Down Expand Up @@ -188,6 +196,46 @@ public void testMultiThreadedBasic() {
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

@Test
public void testMultiThreadedBasicWithLock() {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);

externalBusyThread.start();

w.subscribe(aw);
onSubscribe.waitToFinish();

try {
externalBusyThread.join(10000);
assertFalse(externalBusyThread.isAlive());
assertFalse(externalBusyThread.fail);
} catch (InterruptedException e) {
// ignore
}

assertEquals(3, busyObserver.onNextCount.get());
assertFalse(busyObserver.onError);
assertTrue(busyObserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

@Test
public void testMultiThreadedWithNPE() {
Subscription s = mock(Subscription.class);
Expand Down Expand Up @@ -220,6 +268,52 @@ public void testMultiThreadedWithNPE() {
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

@Test
public void testMultiThreadedWithNPEAndLock() {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);

externalBusyThread.start();

w.subscribe(aw);
onSubscribe.waitToFinish();

try {
externalBusyThread.join(10000);
assertFalse(externalBusyThread.isAlive());
assertFalse(externalBusyThread.fail);
} catch (InterruptedException e) {
// ignore
}

System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());

// we can't know how many onNext calls will occur since they each run on a separate thread
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
// assertEquals(3, busyObserver.onNextCount.get());
assertTrue(busyObserver.onNextCount.get() < 4);
assertTrue(busyObserver.onError);
// no onCompleted because onError was invoked
assertFalse(busyObserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
//verify(s, times(1)).unsubscribe();

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

@Test
public void testMultiThreadedWithNPEinMiddle() {
Subscription s = mock(Subscription.class);
Expand Down Expand Up @@ -250,6 +344,50 @@ public void testMultiThreadedWithNPEinMiddle() {
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

@Test
public void testMultiThreadedWithNPEinMiddleAndLock() {
Subscription s = mock(Subscription.class);
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);

externalBusyThread.start();

w.subscribe(aw);
onSubscribe.waitToFinish();

try {
externalBusyThread.join(10000);
assertFalse(externalBusyThread.isAlive());
assertFalse(externalBusyThread.fail);
} catch (InterruptedException e) {
// ignore
}

System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busyObserver.onNextCount.get());
assertTrue(busyObserver.onNextCount.get() < 9);
assertTrue(busyObserver.onError);
// no onCompleted because onError was invoked
assertFalse(busyObserver.onCompleted);
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
// so commenting out for now as this is not a critical thing to test here
// verify(s, times(1)).unsubscribe();

// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busyObserver.maxConcurrentThreads.get());
}

/**
* A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
* events on many threads.
Expand Down Expand Up @@ -617,14 +755,32 @@ private static class BusyObserver implements Observer<String> {

@Override
public void onCompleted() {
threadsRunning.incrementAndGet();

System.out.println(">>> BusyObserver received onCompleted");
onCompleted = true;

int concurrentThreads = threadsRunning.get();
int maxThreads = maxConcurrentThreads.get();
if (concurrentThreads > maxThreads) {
maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
}
threadsRunning.decrementAndGet();
}

@Override
public void onError(Throwable e) {
threadsRunning.incrementAndGet();

System.out.println(">>> BusyObserver received onError: " + e.getMessage());
onError = true;

int concurrentThreads = threadsRunning.get();
int maxThreads = maxConcurrentThreads.get();
if (concurrentThreads > maxThreads) {
maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
}
threadsRunning.decrementAndGet();
}

@Override
Expand Down Expand Up @@ -652,6 +808,70 @@ public void onNext(String args) {

}

private static class ExternalBusyThread extends Thread {

private BusyObserver observer;
private Object lock;
private int lockTimes;
private int waitTime;
public volatile boolean fail;

public ExternalBusyThread(BusyObserver observer, Object lock, int lockTimes, int waitTime) {
this.observer = observer;
this.lock = lock;
this.lockTimes = lockTimes;
this.waitTime = waitTime;
this.fail = false;
}

@Override
public void run() {
Random r = new Random();
for (int i = 0; i < lockTimes; i++) {
synchronized (lock) {
int oldOnNextCount = observer.onNextCount.get();
boolean oldOnCompleted = observer.onCompleted;
boolean oldOnError = observer.onError;
try {
Thread.sleep(r.nextInt(waitTime));
} catch (InterruptedException e) {
// ignore
}
// Since we own the lock, onNextCount, onCompleted and
// onError must not be changed.
int newOnNextCount = observer.onNextCount.get();
boolean newOnCompleted = observer.onCompleted;
boolean newOnError = observer.onError;
if (oldOnNextCount != newOnNextCount) {
System.out.println(">>> ExternalBusyThread received different onNextCount: "
+ oldOnNextCount
+ " -> "
+ newOnNextCount);
fail = true;
break;
}
if (oldOnCompleted != newOnCompleted) {
System.out.println(">>> ExternalBusyThread received different onCompleted: "
+ oldOnCompleted
+ " -> "
+ newOnCompleted);
fail = true;
break;
}
if (oldOnError != newOnError) {
System.out.println(">>> ExternalBusyThread received different onError: "
+ oldOnError
+ " -> "
+ newOnError);
fail = true;
break;
}
}
}
}

}

}

}