diff --git a/src/main/java/rx/AsyncEmitter.java b/src/main/java/rx/AsyncEmitter.java new file mode 100644 index 0000000000..6235db4f60 --- /dev/null +++ b/src/main/java/rx/AsyncEmitter.java @@ -0,0 +1,82 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx; + +import rx.annotations.Experimental; + +/** + * Abstraction over a RxJava Subscriber that allows associating + * a resource with it and exposes the current number of downstream + * requested amount. + *

+ * The onNext, onError and onCompleted methods should be called + * in a sequential manner, just like the Observer's methods. The + * other methods are threadsafe. + * + * @param the value type to emit + */ +@Experimental +public interface AsyncEmitter extends Observer { + + /** + * Sets a Subscription on this emitter; any previous Subscription + * or Cancellation will be unsubscribed/cancelled. + * @param s the subscription, null is allowed + */ + void setSubscription(Subscription s); + + /** + * Sets a Cancellable on this emitter; any previous Subscription + * or Cancellation will be unsubscribed/cancelled. + * @param c the cancellable resource, null is allowed + */ + void setCancellation(Cancellable c); + /** + * The current outstanding request amount. + *

This method it threadsafe. + * @return the current outstanding request amount + */ + long requested(); + + /** + * A functional interface that has a single close method + * that can throw. + */ + interface Cancellable { + + /** + * Cancel the action or free a resource. + * @throws Exception on error + */ + void cancel() throws Exception; + } + + /** + * Options to handle backpressure in the emitter. + */ + enum BackpressureMode { + NONE, + + ERROR, + + BUFFER, + + DROP, + + LATEST + } +} diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 98b47ec796..b0c405db55 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1680,6 +1680,50 @@ public static Observable from(T[] array) { return create(new OnSubscribeFromArray(array)); } + /** + * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, + * generally non-backpressured world. + *

+ * Example: + *


+     * Observable.<Event>fromAsync(emitter -> {
+     *     Callback listener = new Callback() {
+     *         @Override
+     *         public void onEvent(Event e) {
+     *             emitter.onNext(e);
+     *             if (e.isLast()) {
+     *                 emitter.onCompleted();
+     *             }
+     *         }
+     *         
+     *         @Override
+     *         public void onFailure(Exception e) {
+     *             emitter.onError(e);
+     *         }
+     *     };
+     *     
+     *     AutoCloseable c = api.someMethod(listener);
+     *     
+     *     emitter.setCancellable(c::close);
+     *     
+     * }, BackpressureMode.BUFFER);
+     * 
+ *

+ * You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The + * rest of its methods are threadsafe. + * + * @param asyncEmitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable} + * @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough + * @return the new Observable instance + * @see AsyncEmitter + * @see AsyncEmitter.BackpressureMode + * @see AsyncEmitter.Cancellable + */ + @Experimental + public static Observable fromAsync(Action1> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) { + return create(new OnSubscribeFromAsync(asyncEmitter, backpressure)); + } + /** * Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then * emits the value returned from that function. diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 5d63ab7c27..d627ef22b5 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -69,14 +69,14 @@ public static long getAndAddRequest(AtomicLongFieldUpdater requested, T o } /** - * Adds {@code n} to {@code requested} and returns the value prior to addition once the + * Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once the * addition is successful (uses CAS semantics). If overflows then sets * {@code requested} field to {@code Long.MAX_VALUE}. * * @param requested * atomic long that should be updated * @param n - * the number of requests to add to the requested count + * the number of requests to add to the requested count, positive (not validated) * @return requested value just prior to successful addition */ public static long getAndAddRequest(AtomicLong requested, long n) { @@ -413,4 +413,17 @@ public static long produced(AtomicLong requested, long n) { } } } + + /** + * Validates the requested amount and returns true if it is positive. + * @param n the requested amount + * @return true if n is positive + * @throws IllegalArgumentException if n is negative + */ + public static boolean validate(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + return n != 0L; + } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromAsync.java b/src/main/java/rx/internal/operators/OnSubscribeFromAsync.java new file mode 100644 index 0000000000..1aa42954a1 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeFromAsync.java @@ -0,0 +1,534 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.exceptions.*; +import rx.functions.Action1; +import rx.internal.util.RxRingBuffer; +import rx.internal.util.atomic.SpscUnboundedAtomicArrayQueue; +import rx.internal.util.unsafe.*; +import rx.plugins.RxJavaHooks; +import rx.subscriptions.SerialSubscription; + +public final class OnSubscribeFromAsync implements OnSubscribe { + + final Action1> asyncEmitter; + + final AsyncEmitter.BackpressureMode backpressure; + + public OnSubscribeFromAsync(Action1> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) { + this.asyncEmitter = asyncEmitter; + this.backpressure = backpressure; + } + + @Override + public void call(Subscriber t) { + BaseAsyncEmitter emitter; + + switch (backpressure) { + case NONE: { + emitter = new NoneAsyncEmitter(t); + break; + } + case ERROR: { + emitter = new ErrorAsyncEmitter(t); + break; + } + case DROP: { + emitter = new DropAsyncEmitter(t); + break; + } + case LATEST: { + emitter = new LatestAsyncEmitter(t); + break; + } + default: { + emitter = new BufferAsyncEmitter(t, RxRingBuffer.SIZE); + break; + } + } + + t.add(emitter); + t.setProducer(emitter); + asyncEmitter.call(emitter); + + } + + static final class CancellableSubscription + extends AtomicReference + implements Subscription { + + /** */ + private static final long serialVersionUID = 5718521705281392066L; + + public CancellableSubscription(AsyncEmitter.Cancellable cancellable) { + super(cancellable); + } + + @Override + public boolean isUnsubscribed() { + return get() == null; + } + + @Override + public void unsubscribe() { + if (get() != null) { + AsyncEmitter.Cancellable c = getAndSet(null); + if (c != null) { + try { + c.cancel(); + } catch (Exception ex) { + Exceptions.throwIfFatal(ex); + RxJavaHooks.onError(ex); + } + } + } + } + } + + static abstract class BaseAsyncEmitter + extends AtomicLong + implements AsyncEmitter, Producer, Subscription { + /** */ + private static final long serialVersionUID = 7326289992464377023L; + + final Subscriber actual; + + final SerialSubscription serial; + + public BaseAsyncEmitter(Subscriber actual) { + this.actual = actual; + this.serial = new SerialSubscription(); + } + + @Override + public void onCompleted() { + if (actual.isUnsubscribed()) { + return; + } + try { + actual.onCompleted(); + } finally { + serial.unsubscribe(); + } + } + + @Override + public void onError(Throwable e) { + if (actual.isUnsubscribed()) { + return; + } + try { + actual.onError(e); + } finally { + serial.unsubscribe(); + } + } + + @Override + public final void unsubscribe() { + serial.unsubscribe(); + onUnsubscribed(); + } + + void onUnsubscribed() { + // default is no-op + } + + @Override + public final boolean isUnsubscribed() { + return serial.isUnsubscribed(); + } + + @Override + public final void request(long n) { + if (BackpressureUtils.validate(n)) { + BackpressureUtils.getAndAddRequest(this, n); + onRequested(); + } + } + + void onRequested() { + // default is no-op + } + + @Override + public final void setSubscription(Subscription s) { + serial.set(s); + } + + @Override + public final void setCancellation(AsyncEmitter.Cancellable c) { + setSubscription(new CancellableSubscription(c)); + } + + @Override + public final long requested() { + return get(); + } + } + + static final class NoneAsyncEmitter extends BaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 3776720187248809713L; + + public NoneAsyncEmitter(Subscriber actual) { + super(actual); + } + + @Override + public void onNext(T t) { + if (actual.isUnsubscribed()) { + return; + } + + actual.onNext(t); + + for (;;) { + long r = get(); + if (r == 0L || compareAndSet(r, r - 1)) { + return; + } + } + } + + } + + static abstract class NoOverflowBaseAsyncEmitter extends BaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 4127754106204442833L; + + public NoOverflowBaseAsyncEmitter(Subscriber actual) { + super(actual); + } + + @Override + public final void onNext(T t) { + if (actual.isUnsubscribed()) { + return; + } + + if (get() != 0) { + actual.onNext(t); + BackpressureUtils.produced(this, 1); + } else { + onOverflow(); + } + } + + abstract void onOverflow(); + } + + static final class DropAsyncEmitter extends NoOverflowBaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 8360058422307496563L; + + public DropAsyncEmitter(Subscriber actual) { + super(actual); + } + + @Override + void onOverflow() { + // nothing to do + } + + } + + static final class ErrorAsyncEmitter extends NoOverflowBaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 338953216916120960L; + + public ErrorAsyncEmitter(Subscriber actual) { + super(actual); + } + + @Override + void onOverflow() { + onError(new MissingBackpressureException("fromAsync: could not emit value due to lack of requests")); + } + + } + + static final class BufferAsyncEmitter extends BaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 2427151001689639875L; + + final Queue queue; + + Throwable error; + volatile boolean done; + + final AtomicInteger wip; + + final NotificationLite nl; + + public BufferAsyncEmitter(Subscriber actual, int capacityHint) { + super(actual); + this.queue = UnsafeAccess.isUnsafeAvailable() + ? new SpscUnboundedArrayQueue(capacityHint) + : new SpscUnboundedAtomicArrayQueue(capacityHint); + this.wip = new AtomicInteger(); + this.nl = NotificationLite.instance(); + } + + @Override + public void onNext(T t) { + queue.offer(nl.next(t)); + drain(); + } + + @Override + public void onError(Throwable e) { + error = e; + done = true; + drain(); + } + + @Override + public void onCompleted() { + done = true; + drain(); + } + + @Override + void onRequested() { + drain(); + } + + @Override + void onUnsubscribed() { + if (wip.getAndIncrement() == 0) { + queue.clear(); + } + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + final Subscriber a = actual; + final Queue q = queue; + + for (;;) { + long r = get(); + long e = 0L; + + while (e != r) { + if (a.isUnsubscribed()) { + q.clear(); + return; + } + + boolean d = done; + + Object o = q.poll(); + + boolean empty = o == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + super.onError(ex); + } else { + super.onCompleted(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(nl.getValue(o)); + + e++; + } + + if (e == r) { + if (a.isUnsubscribed()) { + q.clear(); + return; + } + + boolean d = done; + + boolean empty = q.isEmpty(); + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + super.onError(ex); + } else { + super.onCompleted(); + } + return; + } + } + + if (e != 0) { + BackpressureUtils.produced(this, e); + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + } + + static final class LatestAsyncEmitter extends BaseAsyncEmitter { + + /** */ + private static final long serialVersionUID = 4023437720691792495L; + + final AtomicReference queue; + + Throwable error; + volatile boolean done; + + final AtomicInteger wip; + + final NotificationLite nl; + + public LatestAsyncEmitter(Subscriber actual) { + super(actual); + this.queue = new AtomicReference(); + this.wip = new AtomicInteger(); + this.nl = NotificationLite.instance(); + } + + @Override + public void onNext(T t) { + queue.set(nl.next(t)); + drain(); + } + + @Override + public void onError(Throwable e) { + error = e; + done = true; + drain(); + } + + @Override + public void onCompleted() { + done = true; + drain(); + } + + @Override + void onRequested() { + drain(); + } + + @Override + void onUnsubscribed() { + if (wip.getAndIncrement() == 0) { + queue.lazySet(null); + } + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + final Subscriber a = actual; + final AtomicReference q = queue; + + for (;;) { + long r = get(); + long e = 0L; + + while (e != r) { + if (a.isUnsubscribed()) { + q.lazySet(null); + return; + } + + boolean d = done; + + Object o = q.getAndSet(null); + + boolean empty = o == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + super.onError(ex); + } else { + super.onCompleted(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(nl.getValue(o)); + + e++; + } + + if (e == r) { + if (a.isUnsubscribed()) { + q.lazySet(null); + return; + } + + boolean d = done; + + boolean empty = q.get() == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + super.onError(ex); + } else { + super.onCompleted(); + } + return; + } + } + + if (e != 0) { + BackpressureUtils.produced(this, e); + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + } + +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeFromAsyncTest.java b/src/test/java/rx/internal/operators/OnSubscribeFromAsyncTest.java new file mode 100644 index 0000000000..26dd1f75f6 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeFromAsyncTest.java @@ -0,0 +1,727 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import org.junit.*; + +import rx.*; +import rx.exceptions.*; +import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +public class OnSubscribeFromAsyncTest { + + PublishAsyncEmitter source; + + PublishAsyncEmitterNoCancel sourceNoCancel; + + TestSubscriber ts; + + @Before + public void before() { + source = new PublishAsyncEmitter(); + sourceNoCancel = new PublishAsyncEmitterNoCancel(); + ts = TestSubscriber.create(0L); + } + + @Test + public void normalBuffered() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.requestMore(1); + + ts.assertValue(1); + + Assert.assertEquals(0, source.requested()); + + ts.requestMore(1); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void normalDrop() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.DROP).subscribe(ts); + + source.onNext(1); + + ts.requestMore(1); + + source.onNext(2); + source.onCompleted(); + + ts.assertValues(2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void normalLatest() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + + source.onNext(1); + + source.onNext(2); + source.onCompleted(); + + ts.requestMore(1); + + ts.assertValues(2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void normalNone() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.NONE).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void normalNoneRequested() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.NONE).subscribe(ts); + ts.requestMore(2); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + + @Test + public void normalError() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.ERROR).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.assertNoValues(); + ts.assertError(MissingBackpressureException.class); + ts.assertNotCompleted(); + + Assert.assertEquals("fromAsync: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage()); + } + + @Test + public void errorBuffered() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertValue(1); + + ts.requestMore(1); + + ts.assertValues(1, 2); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void errorLatest() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertValues(2); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void errorNone() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.NONE).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertValues(1, 2); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedBuffer() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + ts.unsubscribe(); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedLatest() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + ts.unsubscribe(); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedError() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.ERROR).subscribe(ts); + ts.unsubscribe(); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedDrop() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.DROP).subscribe(ts); + ts.unsubscribe(); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNone() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.NONE).subscribe(ts); + ts.unsubscribe(); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNoCancelBuffer() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + ts.unsubscribe(); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNoCancelLatest() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + ts.unsubscribe(); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNoCancelError() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.ERROR).subscribe(ts); + ts.unsubscribe(); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNoCancelDrop() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.DROP).subscribe(ts); + ts.unsubscribe(); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void unsubscribedNoCancelNone() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.NONE).subscribe(ts); + ts.unsubscribe(); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + @Test + public void deferredRequest() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void take() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).take(2).subscribe(ts); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void takeOne() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).take(1).subscribe(ts); + ts.requestMore(2); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void requestExact() { + Observable.fromAsync(source, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + ts.requestMore(2); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void takeNoCancel() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).take(2).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void takeOneNoCancel() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).take(1).subscribe(ts); + ts.requestMore(2); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + sourceNoCancel.onCompleted(); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void unsubscribeNoCancel() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + ts.requestMore(2); + + sourceNoCancel.onNext(1); + + ts.unsubscribe(); + + sourceNoCancel.onNext(2); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + } + + + @Test + public void unsubscribeInline() { + TestSubscriber ts1 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + unsubscribe(); + } + }; + + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts1); + + sourceNoCancel.onNext(1); + + ts1.assertValues(1); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + } + + @Test + public void completeInline() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onCompleted(); + + ts.requestMore(2); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void errorInline() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(2); + + ts.assertValues(1); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void requestInline() { + TestSubscriber ts1 = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + requestMore(1); + } + }; + + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.BUFFER).subscribe(ts1); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + } + + @Test + public void unsubscribeInlineLatest() { + TestSubscriber ts1 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + unsubscribe(); + } + }; + + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts1); + + sourceNoCancel.onNext(1); + + ts1.assertValues(1); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + } + + @Test + public void unsubscribeInlineExactLatest() { + TestSubscriber ts1 = new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + unsubscribe(); + } + }; + + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts1); + + sourceNoCancel.onNext(1); + + ts1.assertValues(1); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + } + + @Test + public void completeInlineLatest() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onCompleted(); + + ts.requestMore(2); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void completeInlineExactLatest() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onCompleted(); + + ts.requestMore(1); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void errorInlineLatest() { + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts); + + sourceNoCancel.onNext(1); + sourceNoCancel.onError(new TestException()); + + ts.requestMore(2); + + ts.assertValues(1); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void requestInlineLatest() { + TestSubscriber ts1 = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + requestMore(1); + } + }; + + Observable.fromAsync(sourceNoCancel, AsyncEmitter.BackpressureMode.LATEST).subscribe(ts1); + + sourceNoCancel.onNext(1); + sourceNoCancel.onNext(2); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + } + + static final class PublishAsyncEmitter implements Action1>, Observer { + + final PublishSubject subject; + + AsyncEmitter current; + + public PublishAsyncEmitter() { + this.subject = PublishSubject.create(); + } + + long requested() { + return current.requested(); + } + + @Override + public void call(final AsyncEmitter t) { + + this.current = t; + + final Subscription s = subject.subscribe(new Observer() { + + @Override + public void onCompleted() { + t.onCompleted(); + } + + @Override + public void onError(Throwable e) { + t.onError(e); + } + + @Override + public void onNext(Integer v) { + t.onNext(v); + } + + }); + + t.setCancellation(new AsyncEmitter.Cancellable() { + @Override + public void cancel() throws Exception { + s.unsubscribe(); + } + });; + } + + @Override + public void onNext(Integer t) { + subject.onNext(t); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onCompleted() { + subject.onCompleted(); + } + } + + static final class PublishAsyncEmitterNoCancel implements Action1>, Observer { + + final PublishSubject subject; + + public PublishAsyncEmitterNoCancel() { + this.subject = PublishSubject.create(); + } + + @Override + public void call(final AsyncEmitter t) { + + subject.subscribe(new Observer() { + + @Override + public void onCompleted() { + t.onCompleted(); + } + + @Override + public void onError(Throwable e) { + t.onError(e); + } + + @Override + public void onNext(Integer v) { + t.onNext(v); + } + + }); + } + + @Override + public void onNext(Integer t) { + subject.onNext(t); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onCompleted() { + subject.onCompleted(); + } + } + +}