diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index b65fd94f5b..fe0fb3448a 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3289,7 +3289,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) * @see ReactiveX operators documentation: Buffer */ public final Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { - return lift(new OperatorBufferWithTime(timespan, timespan, unit, count, scheduler)); + return lift(new OperatorBufferWithTimeAndSize(timespan, unit, count, scheduler)); } /** @@ -3320,7 +3320,7 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count, * @see ReactiveX operators documentation: Buffer */ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { - return buffer(timespan, timespan, unit, scheduler); + return buffer(timespan, unit, Integer.MAX_VALUE, scheduler); } /** diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithTimeAndSize.java b/src/main/java/rx/internal/operators/OperatorBufferWithTimeAndSize.java new file mode 100644 index 0000000000..ae39959845 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorBufferWithTimeAndSize.java @@ -0,0 +1,361 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Observable.Operator; +import rx.*; +import rx.Scheduler.Worker; +import rx.exceptions.*; +import rx.functions.Action0; +import rx.subscriptions.*; + +/** + * Buffers the source into Lists with maximum size or emission duration, respecting backpressure. + */ +public final class OperatorBufferWithTimeAndSize implements Operator, T> { + final int size; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + public OperatorBufferWithTimeAndSize(long time, TimeUnit unit, int size, Scheduler scheduler) { + this.size = size; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + @Override + public Subscriber call(Subscriber> child) { + Scheduler.Worker worker = scheduler.createWorker(); + child.add(worker); + + final BufferSubscriber bs = new BufferSubscriber(child, size, time, unit, worker); + + child.add(bs); + + child.setProducer(new BufferProducer(bs)); + + return bs; + } + + /** The buffering subscriber for the upstream. */ + static final class BufferSubscriber extends Subscriber { + + final Subscriber> child; + final int size; + final long time; + final TimeUnit unit; + final Worker worker; + + /** The producer of the upstream. */ + Producer producer; + + /** Tracks the downstream requested amounts. */ + final AtomicLong requested; + + /** Tracks the upstream requested amounts. */ + final AtomicLong upstreamRequested; + + /** Holds onto the current timer. */ + final SerialSubscription timer; + + /** The buffer holding the elements or null for a replaced buffer. Guarded by this. */ + List buffer; + + /** The current buffer identifier so timer doesn't emit an old buffer. Guarded by this. */ + long bufferId; + + /** Captures how much time was remaining in the last timeout, in milliseconds. Guarded by this. */ + long timeRemaining; + /** Stores the Worker.now()-relative value where the timer should fire, in milliseconds. Guarded by this. */ + long timeScheduled; + + public BufferSubscriber(Subscriber> child, int size, + long time, TimeUnit unit, Worker worker) { + this.child = child; + this.size = size; + this.time = time; + this.unit = unit; + this.worker = worker; + this.timeRemaining = unit.toMillis(time); + this.requested = new AtomicLong(); + this.upstreamRequested = new AtomicLong(); + this.timer = new SerialSubscription(); + this.add(timer); + } + + @Override + public void setProducer(Producer producer) { + this.producer = producer; + } + + @Override + public void onNext(T t) { + long ur = upstreamRequested.get(); + if (ur == 0) { + onError(new MissingBackpressureException()); + return; + } else + if (ur != Long.MAX_VALUE) { + upstreamRequested.decrementAndGet(); + } + + List list; + long r; + long id; + long delay; + synchronized (this) { + List b = buffer; + if (b == null) { + b = new ArrayList(); + buffer = b; + } + b.add(t); + if (b.size() == size) { + id = ++bufferId; + + list = buffer; + buffer = null; + + r = requested.get(); + if (r != Long.MAX_VALUE) { + delay = calculateNextDelay(); + r = requested.decrementAndGet(); + } else { + delay = -1; // irrelevant in unbounded mode + } + } else { + return; + } + } + scheduleTimer(r, id, delay); + child.onNext(list); + } + + /** Timeout when run in backpressure mode. */ + void timeout(long id) { + List list; + long r; + long delay; + synchronized (this) { + if (id == bufferId) { + list = buffer; + buffer = null; + + id = ++bufferId; + + if (list == null) { + list = new ArrayList(); + } + r = requested.get(); + if (r != Long.MAX_VALUE) { + delay = calculateNextDelay(); + r = requested.decrementAndGet(); + } else { + delay = -1; // irrelevant in unbounded mode + } + } else { + return; + } + } + scheduleTimer(r, id, delay); + child.onNext(list); + } + /** Timeout in unbounded mode. */ + void timeout() { + List list; + synchronized (this) { + list = buffer; + buffer = null; + + ++bufferId; + + if (list == null) { + list = new ArrayList(); + } + } + child.onNext(list); + } + + void scheduleTimer(long r, long id, long delay) { + if (r > 0 && r < Long.MAX_VALUE) { + timer.set(worker.schedule(new TimerAction(id), delay, unit)); + } + } + + /** Calculates the next delay in the unit accounting how much time was left from the previous timer. */ + long calculateNextDelay() { + long delay = timeScheduled - worker.now(); + if (delay <= 0) { + delay = time; + timeScheduled = worker.now() + unit.toMillis(time); + } else { + timeScheduled = worker.now() + delay; + delay = unit.convert(delay, TimeUnit.MILLISECONDS); + } + return delay; + } + + @Override + public void onError(Throwable e) { + timer.unsubscribe(); + try { + synchronized (this) { + buffer = null; + bufferId++; + } + requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request + + child.onError(e); + } finally { + unsubscribe(); + } + } + + @Override + public void onCompleted() { + timer.unsubscribe(); + try { + // either we win and emit the current buffer or the timer in which case + // there is no point in emitting an empty buffer + List list; + synchronized (this) { + list = buffer; + bufferId++; + } + requested.getAndSet(-1); // indicate to the downstream requester there won't be anything to request + if (list != null) { + try { + child.onNext(list); + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + child.onError(t); + return; + } + } + child.onCompleted(); + } finally { + unsubscribe(); + } + } + public void downstreamRequest(long n) { + if (n < 0) { + throw new IllegalArgumentException("Request is negative"); + } + if (n == 0) { + return; + } + for (;;) { + long r = requested.get(); + if (r < 0) { + return; + } + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (requested.compareAndSet(r, u)) { + handleRequested(r, n); + return; + } + } + } + /** + * Handles the change in the request amount. + * @param before the value before the request + * @param request the requested amount + */ + void handleRequested(long before, long request) { + long s = size; + long elements = request * s; + // s != 0 and request != 0 + if ((request >>> 31) != 0 && (elements / request != s)) { + elements = Long.MAX_VALUE; + } + if (before == 0) { + if (request != Long.MAX_VALUE) { + long id; + long delay; + + synchronized (this) { + id = bufferId; + delay = calculateNextDelay(); + } + + timer.set(worker.schedule(new TimerAction(id), delay, unit)); + } else { + timer.set(worker.schedulePeriodically(new PeriodicAction(), time, time, unit)); + } + } + for (;;) { + long r2 = upstreamRequested.get(); + long u2 = r2 + elements; + if (u2 < 0) { + u2 = Long.MAX_VALUE; + } + if (upstreamRequested.compareAndSet(r2, u2)) { + break; + } + } + + Producer p = producer; + if (p != null) { + p.request(elements); + } + } + /** + * The timer action trying to emit the buffer contents. + */ + class TimerAction implements Action0 { + final long id; + private TimerAction(long id) { + this.id = id; + } + @Override + public void call() { + timeout(id); + } + } + /** + * The timer action trying to emit the buffer contents. + */ + class PeriodicAction implements Action0 { + @Override + public void call() { + timeout(); + } + } + } + + /** + * The producer forwarding request calls to a BufferSubscriber. + * + * @param the emitted value type + */ + static final class BufferProducer implements Producer { + final BufferSubscriber bs; + public BufferProducer(BufferSubscriber bs) { + this.bs = bs; + } + @Override + public void request(long n) { + bs.downstreamRequest(n); + } + } +} diff --git a/src/main/java/rx/observers/TestSubscriber.java b/src/main/java/rx/observers/TestSubscriber.java index 709027ea97..3bd1e5c763 100644 --- a/src/main/java/rx/observers/TestSubscriber.java +++ b/src/main/java/rx/observers/TestSubscriber.java @@ -15,10 +15,13 @@ */ package rx.observers; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import rx.*; +import rx.Observer; +import rx.annotations.Experimental; +import rx.exceptions.CompositeException; /** * A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform @@ -29,34 +32,72 @@ public class TestSubscriber extends Subscriber { private final TestObserver testObserver; private final CountDownLatch latch = new CountDownLatch(1); private volatile Thread lastSeenThread; + /** Holds the initial request value. */ + private final long initialRequest; + /** The shared no-op observer. */ + private static final Observer INERT = new Observer() { - public TestSubscriber(Subscriber delegate) { + @Override + public void onCompleted() { + // do nothing + } + + @Override + public void onError(Throwable e) { + // do nothing + } + + @Override + public void onNext(Object t) { + // do nothing + } + + }; + + /** + * Constructs a TestSubscriber with the initial request to be requested from upstream. + * @param initialRequest the initial request value, negative value will revert to the default unbounded behavior + */ + @SuppressWarnings("unchecked") + @Experimental + public TestSubscriber(long initialRequest) { + this((Observer)INERT, initialRequest); + } + + /** + * Constructs a TestSubscriber with the initial request to be requested from upstream + * and a delegate Observer to wrap. + * @param initialRequest the initial request value, negative value will revert to the default unbounded behavior + * @param delegate the Observer instance to wrap + */ + @Experimental + public TestSubscriber(Observer delegate, long initialRequest) { + if (delegate == null) { + throw new NullPointerException(); + } this.testObserver = new TestObserver(delegate); + this.initialRequest = initialRequest; + } + + public TestSubscriber(Subscriber delegate) { + this(delegate, -1); } public TestSubscriber(Observer delegate) { - this.testObserver = new TestObserver(delegate); + this(delegate, -1); } public TestSubscriber() { - this.testObserver = new TestObserver(new Observer() { - - @Override - public void onCompleted() { - // do nothing - } - - @Override - public void onError(Throwable e) { - // do nothing - } - - @Override - public void onNext(T t) { - // do nothing - } - - }); + this(-1); + } + + @Override + public void onStart() { + if (initialRequest >= 0) { + requestMore(initialRequest); + } else { + super.onStart(); + } } /** @@ -261,4 +302,121 @@ public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit uni public Thread getLastSeenThread() { return lastSeenThread; } + + /** + * Assert if there is exactly a single completion event. + */ + @Experimental + public void assertCompleted() { + int s = testObserver.getOnCompletedEvents().size(); + if (s == 0) { + throw new AssertionError("Not completed!"); + } else + if (s > 1) { + throw new AssertionError("Completed multiple times: " + s); + } + } + /** + * Assert if there is no completion event. + */ + @Experimental + public void assertNotCompleted() { + int s = testObserver.getOnCompletedEvents().size(); + if (s == 1) { + throw new AssertionError("Completed!"); + } else + if (s > 1) { + throw new AssertionError("Completed multiple times: " + s); + } + } + /** + * Assert if there is exactly one error event which is a subclass of the given class. + * @param clazz the class to check the error against. + */ + @Experimental + public void assertError(Class clazz) { + List err = testObserver.getOnErrorEvents(); + if (err.size() == 0) { + throw new AssertionError("No errors"); + } else + if (err.size() > 1) { + throw new AssertionError("Multiple errors: " + err.size(), new CompositeException(err)); + } else + if (!clazz.isInstance(err.get(0))) { + throw new AssertionError("Exceptions differ; expected: " + clazz + ", actual: " + err.get(0), err.get(0)); + } + } + /** + * Assert there is a single onError event with the exact exception. + * @param throwable the throwable to check + */ + @Experimental + public void assertError(Throwable throwable) { + List err = testObserver.getOnErrorEvents(); + if (err.size() == 0) { + throw new AssertionError("No errors"); + } else + if (err.size() > 1) { + throw new AssertionError("Multiple errors: " + err.size(), new CompositeException(err)); + } else + if (throwable.equals(err.get(0))) { + throw new AssertionError("Exceptions differ; expected: " + throwable + ", actual: " + err.get(0), err.get(0)); + } + } + /** + * Assert for no onError and onCompleted events. + */ + @Experimental + public void assertNoTerminalEvent() { + List err = testObserver.getOnErrorEvents(); + int s = testObserver.getOnCompletedEvents().size(); + if (err.size() > 0 || s > 0) { + if (err.isEmpty()) { + throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none"); + } else + if (err.size() == 1) { + throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none", err.get(0)); + } else { + throw new AssertionError("Found " + err.size() + " errors and " + s + " completion events instead of none", new CompositeException(err)); + } + } + } + /** + * Assert if there are no onNext events received. + */ + @Experimental + public void assertNoValues() { + int s = testObserver.getOnNextEvents().size(); + if (s > 0) { + throw new AssertionError("No onNext events expected yet some received: " + s); + } + } + /** + * Assert if the given number of onNext events are received. + * @param count the expected number of onNext events + */ + @Experimental + public void assertValueCount(int count) { + int s = testObserver.getOnNextEvents().size(); + if (s != count) { + throw new AssertionError("Number of onNext events differ; expected: " + count + ", actual: " + s); + } + } + + /** + * Assert if the received onNext events, in order, are the specified values. + * @param values the values to check + */ + @Experimental + public void assertValues(T... values) { + assertReceivedOnNext(Arrays.asList(values)); + } + /** + * Assert if there is only a single received onNext event. + * @param values the values to check + */ + @Experimental + public void assertValue(T value) { + assertReceivedOnNext(Collections.singletonList(value)); + } } diff --git a/src/test/java/rx/internal/operators/OperatorBufferTest.java b/src/test/java/rx/internal/operators/OperatorBufferTest.java index 8e0a9b614e..d00ee46966 100644 --- a/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -15,38 +15,22 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; +import org.junit.*; +import org.mockito.*; +import rx.*; import rx.Observable; import rx.Observer; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.TestException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; +import rx.functions.*; import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; @@ -691,6 +675,7 @@ public void bufferWithTimeThrows() { verify(o, never()).onCompleted(); } + @Test public void bufferWithTimeAndSize() { Observable source = Observable.timer(30, 30, TimeUnit.MILLISECONDS, scheduler); diff --git a/src/test/java/rx/internal/operators/OperatorBufferWithTimeAndSizeTest.java b/src/test/java/rx/internal/operators/OperatorBufferWithTimeAndSizeTest.java new file mode 100644 index 0000000000..9abc8c81e9 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorBufferWithTimeAndSizeTest.java @@ -0,0 +1,142 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import rx.Observable; +import rx.exceptions.MissingBackpressureException; +import rx.observers.TestSubscriber; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; + +public class OperatorBufferWithTimeAndSizeTest { + @Test + public void testNoRequest() { + TestScheduler scheduler = new TestScheduler(); + Observable> source = Observable.range(1, 100).buffer(100, TimeUnit.MILLISECONDS, 10, scheduler); + + TestSubscriber> ts = new TestSubscriber>(0); + + source.subscribe(ts); + + scheduler.advanceTimeBy(2, TimeUnit.SECONDS); + + ts.assertNoErrors(); + ts.assertNoValues(); + ts.assertNotCompleted(); + } + @Test + public void testSingleRequest() { + TestScheduler scheduler = new TestScheduler(); + Observable> source = Observable.range(1, 100).buffer(100, TimeUnit.MILLISECONDS, 10, scheduler); + + TestSubscriber> ts = new TestSubscriber>(0); + + source.subscribe(ts); + + ts.assertNoErrors(); + ts.assertNoValues(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + scheduler.advanceTimeBy(2, TimeUnit.SECONDS); + + ts.assertNoErrors(); + ts.assertNotCompleted(); + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); + + } + + @Test + public void testMissingBackpressure() { + TestScheduler scheduler = new TestScheduler(); + PublishSubject ps = PublishSubject.create(); + Observable> source = ps.buffer(100, TimeUnit.MILLISECONDS, 10, scheduler); + + TestSubscriber> ts = new TestSubscriber>(0); + + source.subscribe(ts); + + ps.onNext(1); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(MissingBackpressureException.class); + } + + @Test + public void testSingleRequestWithTimeout() { + TestScheduler scheduler = new TestScheduler(); + PublishSubject ps = PublishSubject.create(); + Observable> source = ps.buffer(100, TimeUnit.MILLISECONDS, 10, scheduler); + + TestSubscriber> ts = new TestSubscriber>(0); + + source.subscribe(ts); + + ts.requestMore(1); + + ps.onNext(1); + + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValue(Arrays.asList(1)); + ts.assertNoTerminalEvent(); + + ps.onNext(2); + ps.onNext(3); + + ts.assertValue(Arrays.asList(1)); + ts.assertNoTerminalEvent(); + } + @Test + public void testUpstreamOverflowBuffer() { + TestScheduler scheduler = new TestScheduler(); + PublishSubject ps = PublishSubject.create(); + Observable> source = ps.buffer(100, TimeUnit.MILLISECONDS, 10, scheduler); + + TestSubscriber> ts = new TestSubscriber>(0); + + source.subscribe(ts); + + ts.requestMore(1); + + ps.onNext(1); + + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValue(Arrays.asList(1)); + ts.assertNoTerminalEvent(); + + for (int i = 2; i <= 11; i++) { + ps.onNext(i); + } + + ts.assertValue(Arrays.asList(1)); + ts.assertError(MissingBackpressureException.class); + } +}