Skip to content

Bounded replay() request coordination doesn't work properly with latecommers #3452

@akarnokd

Description

@akarnokd

Replay works by requesting as many items as the largest requester subscriber does. So if there is one with request(2) and another with request(5), replay will request(5) from the upstream. To be precise, subscriber requests are accumulated and the operator requests the difference between the last known max and the latest known max. This way, if there is an additional request(10) from the first subscriber above, replay will request only 5 additional elements.

This works for the unbounded replay() because all subscribers start from the very beginning. However, if the buffer is bounded, this differencing doesn't work anymore. The following unit test fails with both 1.x and 2.x implementations.

@Test
public void testSubscribersComeAndGoAtRequestBoundaries() {
    ConnectableObservable<Integer> source = Observable.range(1, 10).replay(1);
    source.connect();

    TestSubscriber<Integer> ts1 = TestSubscriber.create(2);

    source.subscribe(ts1);

    ts1.assertValues(1, 2);
    ts1.assertNoErrors();
    ts1.unsubscribe();

    TestSubscriber<Integer> ts2 = TestSubscriber.create(2);

    source.subscribe(ts2);

    ts2.assertValues(2, 3);
    ts2.assertNoErrors();
    ts2.unsubscribe();

    TestSubscriber<Integer> ts3 = TestSubscriber.create();

    source.subscribe(ts3);

    ts3.assertNoErrors();
    ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
    ts3.assertCompleted();
}

What happens here is that even though ts2 subscribes after 2 elements were requested from source, it only receives the very last and replay() doesn't request 1 more.

The idea about fixing this is that the total requested amount of late subscribers would start from a "current index", i.e., the number of items received by replay() so far.

This approach would work in this synchronous test but may not work with asynchronous subscribers. The problem is that the start node and the index has to be atomically updated together so a subscriber "pins" both at the same time: the continuous delivery is guaranteed as well as the proper total requested amount. I'll investigate to make this happen.

Let me emphasize again that the unbounded replay() works properly and the v2 ReplaySubject isn't affected because it is unbounded-in.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions