Skip to content

2.x Flowable.flatMap maxConcurrency should be unbounded unless it is restricted #5126

@mitermayer

Description

@mitermayer

When more than 128 publishers that not all emit events causes miss of any further message that would be sent by subsequent publishers. This is due to the underlying implementation defaults to bufferSize() which is 128 https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/Flowable.java#L8239

According to http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#flatMap(io.reactivex.functions.Function)

The outer Publisher is consumed in unbounded mode

Example to replicate the issue: (assumes that publishers is greater than 128 and that none of the first 128 publishers are emitting any message):

public Flowable<MyType> foo() {
      return publishers.flatMap(publisher -> {
        return publisher
            .subscribeTo("bar")
            .doOnSubscribe(s -> System.out.println(s))
            .doOnError(t -> System.out.println(t))
            .map(MyClass::someOperation)
            .takeUntil(publisher.onClose().toFlowable());
     });
}

At the moment the work around is to use the overloaded flatMap to set manually set maxConcurrency to be unbounded.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions