diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 93d78ee14b..a76f2f3c0b 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -221,6 +221,10 @@ public void call(Subscriber child) { // the producer has been registered with the current subscriber-to-source so // at least it will receive the next terminal event child.add(inner); + + // pin the head of the buffer here, shouldn't affect anything else + r.buffer.replay(inner); + // setting the producer will trigger the first request to be considered by // the subscriber-to-source. child.setProducer(inner); @@ -858,9 +862,15 @@ public void replay(InnerProducer output) { static final class Node extends AtomicReference { /** */ private static final long serialVersionUID = 245354315435971818L; + + /** The contained value. */ final Object value; - public Node(Object value) { + /** The absolute index of the value. */ + final long index; + + public Node(Object value, long index) { this.value = value; + this.index = index; } } @@ -878,9 +888,12 @@ static class BoundedReplayBuffer extends AtomicReference implements Rep Node tail; int size; + /** The total number of received values so far. */ + long index; + public BoundedReplayBuffer() { nl = NotificationLite.instance(); - Node n = new Node(null); + Node n = new Node(null, 0); tail = n; set(n); } @@ -929,7 +942,7 @@ final void setFirst(Node n) { @Override public final void next(T value) { Object o = enterTransform(nl.next(value)); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncate(); } @@ -937,7 +950,7 @@ public final void next(T value) { @Override public final void error(Throwable e) { Object o = enterTransform(nl.error(e)); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncateFinal(); } @@ -945,7 +958,7 @@ public final void error(Throwable e) { @Override public final void complete() { Object o = enterTransform(nl.completed()); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncateFinal(); } @@ -965,15 +978,25 @@ public final void replay(InnerProducer output) { } long r = output.get(); - long r0 = r; + boolean unbounded = r == Long.MAX_VALUE; long e = 0L; Node node = output.index(); if (node == null) { node = get(); output.index = node; + + /* + * Since this is a latecommer, fix its total requested amount + * as if it got all the values up to the node.index + */ + output.addTotalRequested(node.index); } - + + if (output.isUnsubscribed()) { + return; + } + while (r != 0) { Node v = node.get(); if (v != null) { @@ -993,6 +1016,7 @@ public final void replay(InnerProducer output) { return; } e++; + r--; node = v; } else { break; @@ -1004,7 +1028,7 @@ public final void replay(InnerProducer output) { if (e != 0L) { output.index = node; - if (r0 != Long.MAX_VALUE) { + if (!unbounded) { output.produced(e); } } diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index c0ec384d84..3da35b83b8 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -749,11 +749,11 @@ public boolean isUnsubscribed() { @Test public void testBoundedReplayBuffer() { BoundedReplayBuffer buf = new BoundedReplayBuffer(); - buf.addLast(new Node(1)); - buf.addLast(new Node(2)); - buf.addLast(new Node(3)); - buf.addLast(new Node(4)); - buf.addLast(new Node(5)); + buf.addLast(new Node(1, 0)); + buf.addLast(new Node(2, 1)); + buf.addLast(new Node(3, 2)); + buf.addLast(new Node(4, 3)); + buf.addLast(new Node(5, 4)); List values = new ArrayList(); buf.collect(values); @@ -768,8 +768,8 @@ public void testBoundedReplayBuffer() { buf.collect(values); Assert.assertTrue(values.isEmpty()); - buf.addLast(new Node(5)); - buf.addLast(new Node(6)); + buf.addLast(new Node(5, 5)); + buf.addLast(new Node(6, 6)); buf.collect(values); Assert.assertEquals(Arrays.asList(5, 6), values); @@ -1145,4 +1145,107 @@ public void call(Long t) { Assert.assertEquals(Arrays.asList(5L, 5L), requests); } + @Test + public void testSubscribersComeAndGoAtRequestBoundaries() { + ConnectableObservable source = Observable.range(1, 10).replay(1); + source.connect(); + + TestSubscriber ts1 = TestSubscriber.create(2); + + source.subscribe(ts1); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.unsubscribe(); + + TestSubscriber ts2 = TestSubscriber.create(2); + + source.subscribe(ts2); + + ts2.assertValues(2, 3); + ts2.assertNoErrors(); + ts2.unsubscribe(); + + TestSubscriber ts21 = TestSubscriber.create(1); + + source.subscribe(ts21); + + ts21.assertValues(3); + ts21.assertNoErrors(); + ts21.unsubscribe(); + + TestSubscriber ts22 = TestSubscriber.create(1); + + source.subscribe(ts22); + + ts22.assertValues(3); + ts22.assertNoErrors(); + ts22.unsubscribe(); + + + TestSubscriber ts3 = TestSubscriber.create(); + + source.subscribe(ts3); + + ts3.assertNoErrors(); + System.out.println(ts3.getOnNextEvents()); + ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10); + ts3.assertCompleted(); + } + + @Test + public void testSubscribersComeAndGoAtRequestBoundaries2() { + ConnectableObservable source = Observable.range(1, 10).replay(2); + source.connect(); + + TestSubscriber ts1 = TestSubscriber.create(2); + + source.subscribe(ts1); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.unsubscribe(); + + TestSubscriber ts11 = TestSubscriber.create(2); + + source.subscribe(ts11); + + ts11.assertValues(1, 2); + ts11.assertNoErrors(); + ts11.unsubscribe(); + + TestSubscriber ts2 = TestSubscriber.create(3); + + source.subscribe(ts2); + + ts2.assertValues(1, 2, 3); + ts2.assertNoErrors(); + ts2.unsubscribe(); + + TestSubscriber ts21 = TestSubscriber.create(1); + + source.subscribe(ts21); + + ts21.assertValues(2); + ts21.assertNoErrors(); + ts21.unsubscribe(); + + TestSubscriber ts22 = TestSubscriber.create(1); + + source.subscribe(ts22); + + ts22.assertValues(2); + ts22.assertNoErrors(); + ts22.unsubscribe(); + + + TestSubscriber ts3 = TestSubscriber.create(); + + source.subscribe(ts3); + + ts3.assertNoErrors(); + System.out.println(ts3.getOnNextEvents()); + ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10); + ts3.assertCompleted(); + } } \ No newline at end of file