diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c21913f86a..0d36d682ef 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1822,6 +1822,27 @@ public Observable synchronize() { return create(OperationSynchronize.synchronize(this)); } + /** + * Accepts an Observable and wraps it in another Observable that ensures that the resulting + * Observable is chronologically well-behaved. This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter. + *

+ * + *

+ * A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of + * its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}. + * {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously. + * + * @param lock + * The lock object to synchronize each observer call on + * @param + * the type of item emitted by the source Observable + * @return an Observable that is a chronologically well-behaved version of the source + * Observable, and that synchronously notifies its {@link Observer}s + */ + public Observable synchronize(Object lock) { + return create(OperationSynchronize.synchronize(this, lock)); + } + /** * @deprecated Replaced with instance method. */ diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index 129728c303..b7cd689102 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -58,21 +58,46 @@ public final class OperationSynchronize { * @return the wrapped synchronized observable sequence */ public static OnSubscribeFunc synchronize(Observable observable) { - return new Synchronize(observable); + return new Synchronize(observable, null); + } + + /** + * Accepts an observable and wraps it in another observable which ensures that the resulting observable is well-behaved. + * This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter. + * + * A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are + * not interleaved, onCompleted and onError are only called once respectively, and no + * onNext calls follow onCompleted and onError calls. + * + * @param observable + * @param lock + * The lock object to synchronize each observer call on + * @param + * @return the wrapped synchronized observable sequence + */ + public static OnSubscribeFunc synchronize(Observable observable, Object lock) { + return new Synchronize(observable, lock); } private static class Synchronize implements OnSubscribeFunc { - public Synchronize(Observable innerObservable) { + public Synchronize(Observable innerObservable, Object lock) { this.innerObservable = innerObservable; + this.lock = lock; } private Observable innerObservable; private SynchronizedObserver atomicObserver; + private Object lock; public Subscription onSubscribe(Observer observer) { SafeObservableSubscription subscription = new SafeObservableSubscription(); - atomicObserver = new SynchronizedObserver(observer, subscription); + if(lock == null) { + atomicObserver = new SynchronizedObserver(observer, subscription); + } + else { + atomicObserver = new SynchronizedObserver(observer, subscription, lock); + } return subscription.wrap(innerObservable.subscribe(atomicObserver)); } diff --git a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index fff3fccbf8..9a6b14d09f 100644 --- a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -19,6 +19,7 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -68,10 +69,18 @@ public final class SynchronizedObserver implements Observer { private final SafeObservableSubscription subscription; private volatile boolean finishRequested = false; private volatile boolean finished = false; + private volatile Object lock; public SynchronizedObserver(Observer Observer, SafeObservableSubscription subscription) { this.observer = Observer; this.subscription = subscription; + this.lock = this; + } + + public SynchronizedObserver(Observer Observer, SafeObservableSubscription subscription, Object lock) { + this.observer = Observer; + this.subscription = subscription; + this.lock = lock; } /** @@ -80,8 +89,7 @@ public SynchronizedObserver(Observer Observer, SafeObservableSubscrip * @param Observer */ public SynchronizedObserver(Observer Observer) { - this.observer = Observer; - this.subscription = new SafeObservableSubscription(); + this(Observer, new SafeObservableSubscription()); } public void onNext(T arg) { @@ -89,7 +97,7 @@ public void onNext(T arg) { // if we're already stopped, or a finish request has been received, we won't allow further onNext requests return; } - synchronized (this) { + synchronized (lock) { // check again since this could have changed while waiting if (finished || finishRequested || subscription.isUnsubscribed()) { // if we're already stopped, or a finish request has been received, we won't allow further onNext requests @@ -105,7 +113,7 @@ public void onError(Throwable e) { return; } finishRequested = true; - synchronized (this) { + synchronized (lock) { // check again since this could have changed while waiting if (finished || subscription.isUnsubscribed()) { return; @@ -121,7 +129,7 @@ public void onCompleted() { return; } finishRequested = true; - synchronized (this) { + synchronized (lock) { // check again since this could have changed while waiting if (finished || subscription.isUnsubscribed()) { return; @@ -188,6 +196,46 @@ public void testMultiThreadedBasic() { assertEquals(1, busyObserver.maxConcurrentThreads.get()); } + @Test + public void testMultiThreadedBasicWithLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + SafeObservableSubscription as = new SafeObservableSubscription(s); + BusyObserver busyObserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); + + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + + externalBusyThread.start(); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + assertEquals(3, busyObserver.onNextCount.get()); + assertFalse(busyObserver.onError); + assertTrue(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + } + @Test public void testMultiThreadedWithNPE() { Subscription s = mock(Subscription.class); @@ -220,6 +268,52 @@ public void testMultiThreadedWithNPE() { assertEquals(1, busyObserver.maxConcurrentThreads.get()); } + @Test + public void testMultiThreadedWithNPEAndLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); + Observable w = Observable.create(onSubscribe); + + SafeObservableSubscription as = new SafeObservableSubscription(s); + BusyObserver busyObserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); + + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + + externalBusyThread.start(); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + + // we can't know how many onNext calls will occur since they each run on a separate thread + // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options + // assertEquals(3, busyObserver.onNextCount.get()); + assertTrue(busyObserver.onNextCount.get() < 4); + assertTrue(busyObserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + //verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + } + @Test public void testMultiThreadedWithNPEinMiddle() { Subscription s = mock(Subscription.class); @@ -250,6 +344,50 @@ public void testMultiThreadedWithNPEinMiddle() { assertEquals(1, busyObserver.maxConcurrentThreads.get()); } + @Test + public void testMultiThreadedWithNPEinMiddleAndLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); + Observable w = Observable.create(onSubscribe); + + SafeObservableSubscription as = new SafeObservableSubscription(s); + BusyObserver busyObserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); + + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + + externalBusyThread.start(); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + // this should not be the full number of items since the error should stop it before it completes all 9 + System.out.println("onNext count: " + busyObserver.onNextCount.get()); + assertTrue(busyObserver.onNextCount.get() < 9); + assertTrue(busyObserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + } + /** * A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order * events on many threads. @@ -617,14 +755,32 @@ private static class BusyObserver implements Observer { @Override public void onCompleted() { + threadsRunning.incrementAndGet(); + System.out.println(">>> BusyObserver received onCompleted"); onCompleted = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); } @Override public void onError(Throwable e) { + threadsRunning.incrementAndGet(); + System.out.println(">>> BusyObserver received onError: " + e.getMessage()); onError = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); } @Override @@ -652,6 +808,70 @@ public void onNext(String args) { } + private static class ExternalBusyThread extends Thread { + + private BusyObserver observer; + private Object lock; + private int lockTimes; + private int waitTime; + public volatile boolean fail; + + public ExternalBusyThread(BusyObserver observer, Object lock, int lockTimes, int waitTime) { + this.observer = observer; + this.lock = lock; + this.lockTimes = lockTimes; + this.waitTime = waitTime; + this.fail = false; + } + + @Override + public void run() { + Random r = new Random(); + for (int i = 0; i < lockTimes; i++) { + synchronized (lock) { + int oldOnNextCount = observer.onNextCount.get(); + boolean oldOnCompleted = observer.onCompleted; + boolean oldOnError = observer.onError; + try { + Thread.sleep(r.nextInt(waitTime)); + } catch (InterruptedException e) { + // ignore + } + // Since we own the lock, onNextCount, onCompleted and + // onError must not be changed. + int newOnNextCount = observer.onNextCount.get(); + boolean newOnCompleted = observer.onCompleted; + boolean newOnError = observer.onError; + if (oldOnNextCount != newOnNextCount) { + System.out.println(">>> ExternalBusyThread received different onNextCount: " + + oldOnNextCount + + " -> " + + newOnNextCount); + fail = true; + break; + } + if (oldOnCompleted != newOnCompleted) { + System.out.println(">>> ExternalBusyThread received different onCompleted: " + + oldOnCompleted + + " -> " + + newOnCompleted); + fail = true; + break; + } + if (oldOnError != newOnError) { + System.out.println(">>> ExternalBusyThread received different onError: " + + oldOnError + + " -> " + + newOnError); + fail = true; + break; + } + } + } + } + + } + } } \ No newline at end of file