Skip to content

concat not subscribing to the next source if requested is at 0 #3674

Closed
@akarnokd

Description

@akarnokd

The following test fails because the sequence misses the last value of the source:

    @Test
    public void publishConcatBackpressure() {
        TestSubscriber<Integer> ts = TestSubscriber.create(5);

        Observable.range(1, 6)
        .publish(o -> Observable.concat(o.take(5), o.takeLast(5)))
        .subscribe(ts);

        ts.requestMore(1);

        ts.assertValues(1, 2, 3, 4, 5, 6);
        ts.assertNoErrors();
        ts.assertCompleted();
    }

The operator publish() has the property that if all subscribers unsubscribe but the upstream hasn't completed yet, it "slowly" consumes the upstream until a subscriber shows up (at which case it consumes the upstream at the rate of the slowest subscriber again). This may be a discussable property (i.e., why not "pause" until more interesting parties show up), but when it is combined with concat, unexpected data loss may happen.

concat seems to be implemented in a way that if a source completes on a request boundary, that is downstream's requested count is zero, it doesn't subscribe to the next immediately but only when the downstream requests more. This might be a residue from a case where the next source ignores backpressure (just?) and would flood the downstream that didn't request anything yet. However, such sources would ignore any request amount anyway so this behavior might be unnecessary.

In the test above, this is simulated by an initial request of 5. After the subscribe returns, however, the publish() already consumed the rest of the source so the requestMore(1) making the concat subscribe to the second part, there is no value available anymore. (Besides, it won't complete either, see #3673 .)

I'm not sure if these two behaviors are bugs or just unexpected but legal outcomes, so my question is what should we do about them. I believe, tests exist that verify both concat and publish() for their current behavior.

The fix to publish() would be to remove the n == 0 case in its drain loop, making it pause when nobody is interested; and the fix to concat would be a different completion-subscription trampolining that subscribes immediately to the next but doesn't request anything (or zero).

Both can individually ensure the continuity of values in the example above.

Thoughts?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions