Skip to content

Commit cba5952

Browse files
committed
1.x: fix: bounded replay() not requesting enough for latecommers
1 parent f9d3e99 commit cba5952

File tree

2 files changed

+142
-15
lines changed

2 files changed

+142
-15
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@ public void call(Subscriber<? super T> child) {
221221
// the producer has been registered with the current subscriber-to-source so
222222
// at least it will receive the next terminal event
223223
child.add(inner);
224+
225+
// pin the head of the buffer here, shouldn't affect anything else
226+
r.buffer.replay(inner);
227+
224228
// setting the producer will trigger the first request to be considered by
225229
// the subscriber-to-source.
226230
child.setProducer(inner);
@@ -858,9 +862,15 @@ public void replay(InnerProducer<T> output) {
858862
static final class Node extends AtomicReference<Node> {
859863
/** */
860864
private static final long serialVersionUID = 245354315435971818L;
865+
866+
/** The contained value. */
861867
final Object value;
862-
public Node(Object value) {
868+
/** The absolute index of the value. */
869+
final long index;
870+
871+
public Node(Object value, long index) {
863872
this.value = value;
873+
this.index = index;
864874
}
865875
}
866876

@@ -878,9 +888,12 @@ static class BoundedReplayBuffer<T> extends AtomicReference<Node> implements Rep
878888
Node tail;
879889
int size;
880890

891+
/** The total number of received values so far. */
892+
long index;
893+
881894
public BoundedReplayBuffer() {
882895
nl = NotificationLite.instance();
883-
Node n = new Node(null);
896+
Node n = new Node(null, 0);
884897
tail = n;
885898
set(n);
886899
}
@@ -929,23 +942,23 @@ final void setFirst(Node n) {
929942
@Override
930943
public final void next(T value) {
931944
Object o = enterTransform(nl.next(value));
932-
Node n = new Node(o);
945+
Node n = new Node(o, ++index);
933946
addLast(n);
934947
truncate();
935948
}
936949

937950
@Override
938951
public final void error(Throwable e) {
939952
Object o = enterTransform(nl.error(e));
940-
Node n = new Node(o);
953+
Node n = new Node(o, ++index);
941954
addLast(n);
942955
truncateFinal();
943956
}
944957

945958
@Override
946959
public final void complete() {
947960
Object o = enterTransform(nl.completed());
948-
Node n = new Node(o);
961+
Node n = new Node(o, ++index);
949962
addLast(n);
950963
truncateFinal();
951964
}
@@ -965,15 +978,25 @@ public final void replay(InnerProducer<T> output) {
965978
}
966979

967980
long r = output.get();
968-
long r0 = r;
981+
boolean unbounded = r == Long.MAX_VALUE;
969982
long e = 0L;
970983

971984
Node node = output.index();
972985
if (node == null) {
973986
node = get();
974987
output.index = node;
988+
989+
/*
990+
* Since this is a latecommer, fix its total requested amount
991+
* as if it got all the values up to the node.index
992+
*/
993+
output.addTotalRequested(node.index);
975994
}
976-
995+
996+
if (output.isUnsubscribed()) {
997+
return;
998+
}
999+
9771000
while (r != 0) {
9781001
Node v = node.get();
9791002
if (v != null) {
@@ -993,6 +1016,7 @@ public final void replay(InnerProducer<T> output) {
9931016
return;
9941017
}
9951018
e++;
1019+
r--;
9961020
node = v;
9971021
} else {
9981022
break;
@@ -1004,7 +1028,7 @@ public final void replay(InnerProducer<T> output) {
10041028

10051029
if (e != 0L) {
10061030
output.index = node;
1007-
if (r0 != Long.MAX_VALUE) {
1031+
if (!unbounded) {
10081032
output.produced(e);
10091033
}
10101034
}

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -749,11 +749,11 @@ public boolean isUnsubscribed() {
749749
@Test
750750
public void testBoundedReplayBuffer() {
751751
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>();
752-
buf.addLast(new Node(1));
753-
buf.addLast(new Node(2));
754-
buf.addLast(new Node(3));
755-
buf.addLast(new Node(4));
756-
buf.addLast(new Node(5));
752+
buf.addLast(new Node(1, 0));
753+
buf.addLast(new Node(2, 1));
754+
buf.addLast(new Node(3, 2));
755+
buf.addLast(new Node(4, 3));
756+
buf.addLast(new Node(5, 4));
757757

758758
List<Integer> values = new ArrayList<Integer>();
759759
buf.collect(values);
@@ -768,8 +768,8 @@ public void testBoundedReplayBuffer() {
768768
buf.collect(values);
769769
Assert.assertTrue(values.isEmpty());
770770

771-
buf.addLast(new Node(5));
772-
buf.addLast(new Node(6));
771+
buf.addLast(new Node(5, 5));
772+
buf.addLast(new Node(6, 6));
773773
buf.collect(values);
774774

775775
Assert.assertEquals(Arrays.asList(5, 6), values);
@@ -1145,4 +1145,107 @@ public void call(Long t) {
11451145
Assert.assertEquals(Arrays.asList(5L, 5L), requests);
11461146
}
11471147

1148+
@Test
1149+
public void testSubscribersComeAndGoAtRequestBoundaries() {
1150+
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(1);
1151+
source.connect();
1152+
1153+
TestSubscriber<Integer> ts1 = TestSubscriber.create(2);
1154+
1155+
source.subscribe(ts1);
1156+
1157+
ts1.assertValues(1, 2);
1158+
ts1.assertNoErrors();
1159+
ts1.unsubscribe();
1160+
1161+
TestSubscriber<Integer> ts2 = TestSubscriber.create(2);
1162+
1163+
source.subscribe(ts2);
1164+
1165+
ts2.assertValues(2, 3);
1166+
ts2.assertNoErrors();
1167+
ts2.unsubscribe();
1168+
1169+
TestSubscriber<Integer> ts21 = TestSubscriber.create(1);
1170+
1171+
source.subscribe(ts21);
1172+
1173+
ts21.assertValues(3);
1174+
ts21.assertNoErrors();
1175+
ts21.unsubscribe();
1176+
1177+
TestSubscriber<Integer> ts22 = TestSubscriber.create(1);
1178+
1179+
source.subscribe(ts22);
1180+
1181+
ts22.assertValues(3);
1182+
ts22.assertNoErrors();
1183+
ts22.unsubscribe();
1184+
1185+
1186+
TestSubscriber<Integer> ts3 = TestSubscriber.create();
1187+
1188+
source.subscribe(ts3);
1189+
1190+
ts3.assertNoErrors();
1191+
System.out.println(ts3.getOnNextEvents());
1192+
ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
1193+
ts3.assertCompleted();
1194+
}
1195+
1196+
@Test
1197+
public void testSubscribersComeAndGoAtRequestBoundaries2() {
1198+
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(2);
1199+
source.connect();
1200+
1201+
TestSubscriber<Integer> ts1 = TestSubscriber.create(2);
1202+
1203+
source.subscribe(ts1);
1204+
1205+
ts1.assertValues(1, 2);
1206+
ts1.assertNoErrors();
1207+
ts1.unsubscribe();
1208+
1209+
TestSubscriber<Integer> ts11 = TestSubscriber.create(2);
1210+
1211+
source.subscribe(ts11);
1212+
1213+
ts11.assertValues(1, 2);
1214+
ts11.assertNoErrors();
1215+
ts11.unsubscribe();
1216+
1217+
TestSubscriber<Integer> ts2 = TestSubscriber.create(3);
1218+
1219+
source.subscribe(ts2);
1220+
1221+
ts2.assertValues(1, 2, 3);
1222+
ts2.assertNoErrors();
1223+
ts2.unsubscribe();
1224+
1225+
TestSubscriber<Integer> ts21 = TestSubscriber.create(1);
1226+
1227+
source.subscribe(ts21);
1228+
1229+
ts21.assertValues(2);
1230+
ts21.assertNoErrors();
1231+
ts21.unsubscribe();
1232+
1233+
TestSubscriber<Integer> ts22 = TestSubscriber.create(1);
1234+
1235+
source.subscribe(ts22);
1236+
1237+
ts22.assertValues(2);
1238+
ts22.assertNoErrors();
1239+
ts22.unsubscribe();
1240+
1241+
1242+
TestSubscriber<Integer> ts3 = TestSubscriber.create();
1243+
1244+
source.subscribe(ts3);
1245+
1246+
ts3.assertNoErrors();
1247+
System.out.println(ts3.getOnNextEvents());
1248+
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
1249+
ts3.assertCompleted();
1250+
}
11481251
}

0 commit comments

Comments
 (0)