diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 02efb20f3f..38edc0a68f 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -15,25 +15,17 @@ */ package rx.internal.operators; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Observable.Operator; -import rx.exceptions.*; -import rx.Observer; -import rx.Producer; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Func1; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.Observable.*; +import rx.functions.*; +import rx.internal.producers.ProducerArbiter; +import rx.internal.util.*; import rx.observables.GroupedObservable; -import rx.subjects.Subject; +import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; /** @@ -49,381 +41,523 @@ * @param * the value type of the groups */ -public class OperatorGroupBy implements Operator, T> { +public final class OperatorGroupBy implements Operator, T>{ final Func1 keySelector; - final Func1 valueSelector; + final Func1 valueSelector; + final int bufferSize; + final boolean delayError; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public OperatorGroupBy(Func1 keySelector) { + this(keySelector, (Func1)UtilityFunctions.identity(), RxRingBuffer.SIZE, false); + } - @SuppressWarnings("unchecked") - public OperatorGroupBy(final Func1 keySelector) { - this(keySelector, (Func1) IDENTITY); + public OperatorGroupBy(Func1 keySelector, Func1 valueSelector) { + this(keySelector, valueSelector, RxRingBuffer.SIZE, false); } - public OperatorGroupBy( - Func1 keySelector, - Func1 valueSelector) { + public OperatorGroupBy(Func1 keySelector, Func1 valueSelector, int bufferSize, boolean delayError) { this.keySelector = keySelector; this.valueSelector = valueSelector; + this.bufferSize = bufferSize; + this.delayError = delayError; } - + @Override - public Subscriber call(final Subscriber> child) { - return new GroupBySubscriber(keySelector, valueSelector, child); - } - - static final class GroupBySubscriber extends Subscriber { - private static final int MAX_QUEUE_SIZE = 1024; - final GroupBySubscriber self = this; - final Func1 keySelector; - final Func1 elementSelector; - final Subscriber> child; + public Subscriber call(Subscriber> t) { + final GroupBySubscriber parent = new GroupBySubscriber(t, keySelector, valueSelector, bufferSize, delayError); - // We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true. - // Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly. - // Should check both when `child.unsubscribe` is called and any group is removed. - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe"); - volatile int wipForUnsubscribe = 1; - - public GroupBySubscriber( - Func1 keySelector, - Func1 elementSelector, - Subscriber> child) { - super(); - this.keySelector = keySelector; - this.elementSelector = elementSelector; - this.child = child; - child.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) { - self.unsubscribe(); - } - } - - })); - } - - private static class GroupState { - private final Subject s = BufferUntilSubscriber.create(); - private final AtomicLong requested = new AtomicLong(); - private final AtomicLong count = new AtomicLong(); - private final Queue buffer = new ConcurrentLinkedQueue(); // TODO should this be lazily created? - - public Observable getObservable() { - return s; + t.add(Subscriptions.create(new Action0() { + @Override + public void call() { + parent.cancel(); } + })); - public Observer getObserver() { - return s; - } + t.setProducer(parent.producer); + + return parent; + } + public static final class GroupByProducer implements Producer { + final GroupBySubscriber parent; + + public GroupByProducer(GroupBySubscriber parent) { + this.parent = parent; } - - private final ConcurrentHashMap> groups = new ConcurrentHashMap>(); - - private static final NotificationLite nl = NotificationLite.instance(); - - volatile int completionEmitted; - - private static final int UNTERMINATED = 0; - private static final int TERMINATED_WITH_COMPLETED = 1; - private static final int TERMINATED_WITH_ERROR = 2; - - // Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR` - volatile int terminated = UNTERMINATED; - - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "completionEmitted"); + @Override + public void request(long n) { + parent.requestMore(n); + } + } + + public static final class GroupBySubscriber + extends Subscriber { + final Subscriber> actual; + final Func1 keySelector; + final Func1 valueSelector; + final int bufferSize; + final boolean delayError; + final Map> groups; + final Queue> queue; + final GroupByProducer producer; + + static final Object NULL_KEY = new Object(); + + final ProducerArbiter s; + + volatile int cancelled; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater TERMINATED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "terminated"); + static final AtomicIntegerFieldUpdater CANCELLED = + AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "cancelled"); volatile long requested; @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "requested"); - - volatile long bufferedCount; + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "requested"); + + volatile int groupCount; @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater BUFFERED_COUNT = AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "bufferedCount"); + static final AtomicIntegerFieldUpdater GROUP_COUNT = + AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "groupCount"); + + Throwable error; + volatile boolean done; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wip"); + + public GroupBySubscriber(Subscriber> actual, Func1 keySelector, Func1 valueSelector, int bufferSize, boolean delayError) { + this.actual = actual; + this.keySelector = keySelector; + this.valueSelector = valueSelector; + this.bufferSize = bufferSize; + this.delayError = delayError; + this.groups = new ConcurrentHashMap>(); + this.queue = new ConcurrentLinkedQueue>(); + GROUP_COUNT.lazySet(this, 1); + this.s = new ProducerArbiter(); + this.s.request(bufferSize); + this.producer = new GroupByProducer(this); + } + @Override - public void onStart() { - REQUESTED.set(this, MAX_QUEUE_SIZE); - request(MAX_QUEUE_SIZE); + public void setProducer(Producer s) { + this.s.setProducer(s); } - + @Override - public void onCompleted() { - if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_COMPLETED)) { - // if we receive onCompleted from our parent we onComplete children - // for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it. - for (GroupState group : groups.values()) { - emitItem(group, nl.completed()); - } - - // special case (no groups emitted ... or all unsubscribed) - if (groups.isEmpty()) { - // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted - if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) { - child.onCompleted(); - } - } + public void onNext(T t) { + if (done) { + return; } - } - @Override - public void onError(Throwable e) { - if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_ERROR)) { - // It's safe to access all groups and emit the error. - // onNext and onError are in sequence so no group will be created in the loop. - for (GroupState group : groups.values()) { - emitItem(group, nl.error(e)); - } - try { - // we immediately tear everything down if we receive an error - child.onError(e); - } finally { - // We have not chained the subscribers, so need to call it explicitly. - unsubscribe(); + final Queue> q = this.queue; + final Subscriber> a = this.actual; + + K key; + try { + key = keySelector.call(t); + } catch (Throwable ex) { + unsubscribe(); + errorAll(a, q, ex); + return; + } + + boolean notNew = true; + Object mapKey = key != null ? key : NULL_KEY; + GroupedUnicast group = groups.get(mapKey); + if (group == null) { + // if the main has been cancelled, stop creating groups + // and skip this value + if (cancelled == 0) { + group = GroupedUnicast.createWith(key, bufferSize, this, delayError); + groups.put(mapKey, group); + + GROUP_COUNT.getAndIncrement(this); + + notNew = false; + q.offer(group); + drain(); + } else { + return; } } - } + + V v; + try { + v = valueSelector.call(t); + } catch (Throwable ex) { + unsubscribe(); + errorAll(a, q, ex); + return; + } - // The grouped observable propagates the 'producer.request' call from it's subscriber to this method - // Here we keep track of the requested count for each group - // If we already have items queued when a request comes in we vend those and decrement the outstanding request count + group.onNext(v); - void requestFromGroupedObservable(long n, GroupState group) { - BackpressureUtils.getAndAddRequest(group.requested, n); - if (group.count.getAndIncrement() == 0) { - pollQueue(group); + if (notNew) { + s.request(1); } } - - private Object groupedKey(K key) { - return key == null ? NULL_KEY : key; + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + return; + } + error = t; + done = true; + GROUP_COUNT.decrementAndGet(this); + drain(); } - - @SuppressWarnings("unchecked") - private K getKey(Object groupedKey) { - return groupedKey == NULL_KEY ? null : (K) groupedKey; + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + GROUP_COUNT.decrementAndGet(this); + drain(); } - @Override - public void onNext(T t) { - try { - final Object key = groupedKey(keySelector.call(t)); - GroupState group = groups.get(key); - if (group == null) { - // this group doesn't exist - if (child.isUnsubscribed()) { - // we have been unsubscribed on the outer so won't send any more groups - return; - } - group = createNewGroup(key); + public void requestMore(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + + BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + drain(); + } + + public void cancel() { + // cancelling the main source means we don't want any more groups + // but running groups still require new values + if (CANCELLED.compareAndSet(this, 0, 1)) { + if (GROUP_COUNT.decrementAndGet(this) == 0) { + unsubscribe(); } - if (group != null) { - emitItem(group, nl.next(t)); + } + } + + public void cancel(K key) { + Object mapKey = key != null ? key : NULL_KEY; + if (groups.remove(mapKey) != null) { + if (GROUP_COUNT.decrementAndGet(this) == 0) { + unsubscribe(); } - } catch (Throwable e) { - Exceptions.throwOrReport(e, this, t); } } - - private GroupState createNewGroup(final Object key) { - final GroupState groupState = new GroupState(); - - GroupedObservable go = GroupedObservable.create(getKey(key), new OnSubscribe() { - - @Override - public void call(final Subscriber o) { - o.setProducer(new Producer() { - - @Override - public void request(long n) { - requestFromGroupedObservable(n, groupState); - } - - }); - - final AtomicBoolean once = new AtomicBoolean(); - - groupState.getObservable().doOnUnsubscribe(new Action0() { - - @Override - public void call() { - if (once.compareAndSet(false, true)) { - // done once per instance, either onComplete or onUnSubscribe - cleanupGroup(key); - } - } - - }).unsafeSubscribe(new Subscriber(o) { - @Override - public void onCompleted() { - o.onCompleted(); - // eagerly cleanup instead of waiting for unsubscribe - if (once.compareAndSet(false, true)) { - // done once per instance, either onComplete or onUnSubscribe - cleanupGroup(key); - } - } - - @Override - public void onError(Throwable e) { - o.onError(e); - // eagerly cleanup instead of waiting for unsubscribe - if (once.compareAndSet(false, true)) { - // done once per instance, either onComplete or onUnSubscribe - cleanupGroup(key); - } - } - - @Override - public void onNext(T t) { - try { - o.onNext(elementSelector.call(t)); - } catch (Throwable e) { - Exceptions.throwOrReport(e, this, t); - } - } - }); + + void drain() { + if (WIP.getAndIncrement(this) != 0) { + return; + } + + int missed = 1; + + final Queue> q = this.queue; + final Subscriber> a = this.actual; + + for (;;) { + + if (checkTerminated(done, q.isEmpty(), a, q)) { + return; } - }); + + long r = requested; + boolean unbounded = r == Long.MAX_VALUE; + long e = 0L; + + while (r != 0) { + boolean d = done; + + GroupedObservable t = q.poll(); + + boolean empty = t == null; + + if (checkTerminated(d, empty, a, q)) { + return; + } + + if (empty) { + break; + } - GroupState putIfAbsent; - for (;;) { - int wip = wipForUnsubscribe; - if (wip <= 0) { - return null; + a.onNext(t); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + REQUESTED.addAndGet(this, e); + } + s.request(-e); } - if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) { - putIfAbsent = groups.putIfAbsent(key, groupState); + + missed = WIP.addAndGet(this, -missed); + if (missed == 0) { break; } } - if (putIfAbsent != null) { - // this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug - throw new IllegalStateException("Group already existed while creating a new one"); + } + + void errorAll(Subscriber> a, Queue q, Throwable ex) { + q.clear(); + List> list = new ArrayList>(groups.values()); + groups.clear(); + + for (GroupedUnicast e : list) { + e.onError(ex); } - child.onNext(go); - - return groupState; + + a.onError(ex); } - - private void cleanupGroup(Object key) { - GroupState removed; - removed = groups.remove(key); - if (removed != null) { - if (!removed.buffer.isEmpty()) { - BUFFERED_COUNT.addAndGet(self, -removed.buffer.size()); + + boolean checkTerminated(boolean d, boolean empty, + Subscriber> a, Queue q) { + if (d) { + Throwable err = error; + if (err != null) { + errorAll(a, q, err); + return true; + } else + if (empty) { + List> list = new ArrayList>(groups.values()); + groups.clear(); + + for (GroupedUnicast e : list) { + e.onComplete(); + } + + actual.onCompleted(); + return true; } - completeInner(); - // since we may have unsubscribed early with items in the buffer - // we remove those above and have freed up room to request more - // so give it a chance to request more now - requestMoreIfNecessary(); } + return false; } + } + + static final class GroupedUnicast extends GroupedObservable { + + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber parent, boolean delayError) { + State state = new State(bufferSize, parent, key, delayError); + return new GroupedUnicast(key, state); + } + + final State state; + + protected GroupedUnicast(K key, State state) { + super(key, state); + this.state = state; + } + + public void onNext(T t) { + state.onNext(t); + } + + public void onError(Throwable e) { + state.onError(e); + } + + public void onComplete() { + state.onComplete(); + } + } + + static final class State extends AtomicInteger implements Producer, Subscription, OnSubscribe { + /** */ + private static final long serialVersionUID = -3852313036005250360L; + + final K key; + final Queue queue; + final GroupBySubscriber parent; + final boolean delayError; + + volatile long requested; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(State.class, "requested"); + + volatile boolean done; + Throwable error; + + volatile int cancelled; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater CANCELLED = + AtomicIntegerFieldUpdater.newUpdater(State.class, "cancelled"); + + volatile Subscriber actual; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater ACTUAL = + AtomicReferenceFieldUpdater.newUpdater(State.class, Subscriber.class, "actual"); - private void emitItem(GroupState groupState, Object item) { - Queue q = groupState.buffer; - AtomicLong keyRequested = groupState.requested; - //don't need to check for requested being Long.MAX_VALUE because this - //field is capped at MAX_QUEUE_SIZE - REQUESTED.decrementAndGet(this); - // short circuit buffering - if (keyRequested != null && keyRequested.get() > 0 && (q == null || q.isEmpty())) { - @SuppressWarnings("unchecked") - Observer obs = (Observer)groupState.getObserver(); - nl.accept(obs, item); - if (keyRequested.get() != Long.MAX_VALUE) { - // best endeavours check (no CAS loop here) because we mainly care about - // the initial request being Long.MAX_VALUE and that value being conserved. - keyRequested.decrementAndGet(); - } - } else { - q.add(item); - BUFFERED_COUNT.incrementAndGet(this); - - if (groupState.count.getAndIncrement() == 0) { - pollQueue(groupState); - } + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(State.class, "once"); + + + public State(int bufferSize, GroupBySubscriber parent, K key, boolean delayError) { + this.queue = new ConcurrentLinkedQueue(); + this.parent = parent; + this.key = key; + this.delayError = delayError; + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= required but it was " + n); + } + if (n != 0L) { + BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + drain(); } - requestMoreIfNecessary(); } - - private void pollQueue(GroupState groupState) { - do { - drainIfPossible(groupState); - long c = groupState.count.decrementAndGet(); - if (c > 1) { - - /* - * Set down to 1 and then iterate again. - * we lower it to 1 otherwise it could have grown very large while in the last poll loop - * and then we can end up looping all those times again here before existing even once we've drained - */ - groupState.count.set(1); - // we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after + + @Override + public boolean isUnsubscribed() { + return cancelled != 0; + } + + @Override + public void unsubscribe() { + if (CANCELLED.compareAndSet(this, 0, 1)) { + if (getAndIncrement() == 0) { + parent.cancel(key); } - } while (groupState.count.get() > 0); + } + } + + @Override + public void call(Subscriber s) { + if (ONCE.compareAndSet(this, 0, 1)) { + s.add(this); + s.setProducer(this); + ACTUAL.lazySet(this, s); + drain(); + } else { + s.onError(new IllegalStateException("Only one Subscriber allowed!")); + } } - private void requestMoreIfNecessary() { - if (REQUESTED.get(this) == 0 && terminated == 0) { - long toRequest = MAX_QUEUE_SIZE - BUFFERED_COUNT.get(this); - if (toRequest > 0 && REQUESTED.compareAndSet(this, 0, toRequest)) { - request(toRequest); - } + public void onNext(T t) { + if (t == null) { + error = new NullPointerException(); + done = true; + } else { + queue.offer(NotificationLite.instance().next(t)); } + drain(); + } + + public void onError(Throwable e) { + error = e; + done = true; + drain(); + } + + public void onComplete() { + done = true; + drain(); } - private void drainIfPossible(GroupState groupState) { - while (groupState.requested.get() > 0) { - Object t = groupState.buffer.poll(); - if (t != null) { - @SuppressWarnings("unchecked") - Observer obs = (Observer)groupState.getObserver(); - nl.accept(obs, t); - if (groupState.requested.get()!=Long.MAX_VALUE) { - // best endeavours check (no CAS loop here) because we mainly care about - // the initial request being Long.MAX_VALUE and that value being conserved. - groupState.requested.decrementAndGet(); + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; + + final Queue q = queue; + final boolean delayError = this.delayError; + Subscriber a = actual; + NotificationLite nl = NotificationLite.instance(); + for (;;) { + if (a != null) { + if (checkTerminated(done, q.isEmpty(), a, delayError)) { + return; } - BUFFERED_COUNT.decrementAndGet(this); - - // if we have used up all the events we requested from upstream then figure out what to ask for this time based on the empty space in the buffer - requestMoreIfNecessary(); - } else { - // queue is empty break + + long r = requested; + boolean unbounded = r == Long.MAX_VALUE; + long e = 0; + + while (r != 0L) { + boolean d = done; + Object v = q.poll(); + boolean empty = v == null; + + if (checkTerminated(d, empty, a, delayError)) { + return; + } + + if (empty) { + break; + } + + a.onNext(nl.getValue(v)); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + REQUESTED.addAndGet(this, e); + } + parent.s.request(-e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { break; } + if (a == null) { + a = actual; + } } } - - private void completeInner() { - // A group is removed, so check if we need to call `unsubscribe` - if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) { - // It means `groups.isEmpty() && child.isUnsubscribed()` is true - unsubscribe(); - } else if (groups.isEmpty() && terminated == TERMINATED_WITH_COMPLETED) { - // if we have no outstanding groups (all completed or unsubscribe) and terminated on outer - // completionEmitted ensures we only emit onCompleted once - if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) { - child.onCompleted(); + + boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { + if (cancelled != 0) { + queue.clear(); + parent.cancel(key); + return true; + } + + if (d) { + if (delayError) { + if (empty) { + Throwable e = error; + if (e != null) { + a.onError(e); + } else { + a.onCompleted(); + } + return true; + } + } else { + Throwable e = error; + if (e != null) { + queue.clear(); + a.onError(e); + return true; + } else + if (empty) { + a.onCompleted(); + return true; + } } } + + return false; } - } - - private final static Func1 IDENTITY = new Func1() { - @Override - public Object call(Object t) { - return t; - } - }; - - private static final Object NULL_KEY = new Object(); } diff --git a/src/test/java/rx/internal/operators/OperatorGroupByTest.java b/src/test/java/rx/internal/operators/OperatorGroupByTest.java index b14b7ad373..57cfdbcf4a 100644 --- a/src/test/java/rx/internal/operators/OperatorGroupByTest.java +++ b/src/test/java/rx/internal/operators/OperatorGroupByTest.java @@ -15,45 +15,24 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.MockitoAnnotations; - -import rx.Notification; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; +import org.mockito.*; + +import rx.*; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.TestException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.internal.util.UtilityFunctions; +import rx.functions.*; +import rx.internal.util.*; import rx.observables.GroupedObservable; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -1501,4 +1480,105 @@ public void onNext(Integer t) { }}); assertTrue(completed.get()); } + + /** + * Issue #3425. + * + * The problem is that a request of 1 may create a new group, emit to the desired group + * or emit to a completely different group. In this test, the merge requests N which + * must be produced by the range, however it will create a bunch of groups before the actual + * group receives a value. + */ + @Test + public void testBackpressureObserveOnOuter() { + for (int j = 0; j < 1000; j++) { + Observable.merge( + Observable.range(0, 500) + .groupBy(new Func1() { + @Override + public Object call(Integer i) { + return i % (RxRingBuffer.SIZE + 2); + } + }) + .observeOn(Schedulers.computation()) + ).toBlocking().last(); + } + } + + /** + * Synchronous verification of issue #3425. + */ + @Test + public void testBackpressureInnerDoesntOverflowOuter() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1, 2) + .groupBy(new Func1() { + @Override + public Object call(Integer v) { + return v; + } + }) + .doOnNext(new Action1>() { + @Override + public void call(GroupedObservable g) { + // this will request Long.MAX_VALUE + g.subscribe(); + } + }) + // this won't request anything just yet + .subscribe(ts) + ; + ts.requestMore(1); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(1); + } + + @Test + public void testOneGroupInnerRequestsTwiceBuffer() { + TestSubscriber ts1 = TestSubscriber.create(0); + final TestSubscriber ts2 = TestSubscriber.create(0); + + Observable.range(1, RxRingBuffer.SIZE * 2) + .groupBy(new Func1() { + @Override + public Object call(Integer v) { + return 1; + } + }) + .doOnNext(new Action1>() { + @Override + public void call(GroupedObservable g) { + g.subscribe(ts2); + } + }) + .subscribe(ts1); + + ts1.assertNoValues(); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + + ts2.assertNoValues(); + ts2.assertNoErrors(); + ts2.assertNotCompleted(); + + ts1.requestMore(1); + + ts1.assertValueCount(1); + ts1.assertNoErrors(); + ts1.assertNotCompleted(); + + ts2.assertNoValues(); + ts2.assertNoErrors(); + ts2.assertNotCompleted(); + + ts2.requestMore(RxRingBuffer.SIZE * 2); + + ts2.assertValueCount(RxRingBuffer.SIZE * 2); + ts2.assertNoErrors(); + ts2.assertNotCompleted(); + } + } diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index 146ee3c254..dc6eb510a9 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -874,6 +874,7 @@ public void call(Subscriber o) { }); origin.retry() + .onBackpressureBuffer() // FIXME the new GroupBy won't request enough for this particular test and retry overflows .groupBy(new Func1() { @Override public String call(String t1) {