Skip to content

Merge Limiting Concurrency - Can Deadlock #3156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Aug 14, 2015 · 3 comments
Closed

Merge Limiting Concurrency - Can Deadlock #3156

benjchristensen opened this issue Aug 14, 2015 · 3 comments

Comments

@benjchristensen
Copy link
Member

Since the merge changes in 1.0.13 we now have a bug in merge that limits concurrency and can cause an async "deadlock" since not all Observables being merged will be subscribed to.

These unit tests show the issue:

    @Test
    public void testUnboundedDefaultConcurrency() {
        List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
        for(int i=0; i < 2000; i++) {
            os.add(Observable.<Integer>never());
        }
        os.add(Observable.range(0, 100));       

        TestSubscriber<Integer> ts = TestSubscriber.create();
        Observable.merge(os).take(1).subscribe(ts);
        ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
        ts.assertValue(0);
        ts.assertCompleted();
    }

    @Test
    public void testConcurrencyLimit() {
        List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
        for(int i=0; i < 2000; i++) {
            os.add(Observable.<Integer>never());
        }
        os.add(Observable.range(0, 100));       

        TestSubscriber<Integer> ts = TestSubscriber.create();
        Observable.merge(os, Integer.MAX_VALUE).take(1).subscribe(ts);
        ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
        ts.assertValue(0);
        ts.assertCompleted();
    }

Surprisingly, even when providing the maxConcurrent value the issue still happens.

We have bounded vertical buffers, but always must default to unbounded horizontal buffers, since it is the code that defines how many items are horizontally buffered. This affects both merge and groupBy. The maxConcurrent overload allows a developer to limit the horizontal buffering, and a developer controls the groupBy selector.

@akarnokd
Copy link
Member

This shortcoming is much earlier, at least as old as RxRingBuffer and a certain Scalar optimization in merge...

@benjchristensen
Copy link
Member Author

Scalar is different ... they aren't async so aren't an issue. A scalar ALWAYS has a value in it, thus it will always emit if there is downstream support.

These unit tests pass if I go back to 1.0.12

    @Test
    public void testUnboundedDefaultConcurrency() {
        List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
        for(int i=0; i < 2000; i++) {
            os.add(Observable.<Integer>never());
        }
        os.add(Observable.range(0, 100));       

        TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
        Observable.merge(os).take(1).subscribe(ts);
        ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
        ts.assertValue(0);
        ts.assertCompleted();
    }

    @Test
    public void testConcurrencyLimit() {
        List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
        for(int i=0; i < 2000; i++) {
            os.add(Observable.<Integer>never());
        }
        os.add(Observable.range(0, 100));       

        TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
        Observable.merge(os, 5000).take(1).subscribe(ts); // reduced to 5000 instead of MAX_VALUE since it causes an OOM
        ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
        ts.assertValue(0);
        ts.assertCompleted();
    }

@akarnokd
Copy link
Member

In v12, if there were scalars in the input source, the subscribing was held off until some scalars were drained (i.e., in case of an async downstream request): given 1 normal and 128 scalar queued up, the next source wasn't requested until some or all scalars were drained.

In order for the tests to pass, we need an Spsc queue instead of the RxRingBuffer if the maxConcurrency is lesss than max, otherwise, we need an unbounded spsc queue. JCTools' SpscArrayQueue supports only power of 2 capacity so a backpressure violation wouldn't be detected at item 5000 but only at item 8192. SpscUnboundedArrayQueue works if its capacity is at least some value due to a bug in its lookahead-grow logic (8 with defaults).

V14 merge has the side effect of limiting the tracking array size so the copy-on-write has to work with at most 512 bytes per add/remove.

I happen to have a platform safe SpscArrayQueue with explicit capacity insurance (but still power of 2 memory cost) and a platform safe SpscLinkedArrayQueue (still sensitive to certain capacity settings), courtesy of my rsi flatMap.

I can't do PR for a few days but this is a simple change (provided the tradeoff is understood) and most of the community should be able to pull this off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants