Skip to content

Issus about Flowable concatMapDelayError #6520

@LiuShuaiQ

Description

@LiuShuaiQ

RxJava version 2.2.9.

I feel doubt about Flowable#concatMapDelayError, when I use Flowable#fromCallable

The following code:

   Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
        .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
          @Override public Flowable<Integer> apply(final Integer integer) throws Exception {
            return Flowable.fromCallable(new Callable<Integer>() {
              @Override public Integer call() throws Exception {
                if (integer >= 100) {
                  throw new NullPointerException("test null exp");
                }
                return integer;
              }
            });
          }
        })
        .subscribe(new Consumer<Integer>() {
          @Override public void accept(Integer integer) throws Exception {
            Log.e(TAG, "test-->complete-" + integer);
          }
        }, new Consumer<Throwable>() {
          @Override public void accept(Throwable throwable) throws Exception {
            Log.e(TAG, "test-->error-" + throwable);
          }
        });

The following output:

  test-->complete-1
  test-->complete-2
  test-->complete-3
  test-->error-java.lang.NullPointerException: test null exp

It do not delay error. But I use Observable is success delayed the error, And I use Flowable#create is success delayed the error.

The following code:

    Observable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
        .concatMapDelayError(new Function<Integer, Observable<Integer>>() {
          @Override public Observable<Integer> apply(final Integer integer) throws Exception {
            return Observable.fromCallable(new Callable<Integer>() {
              @Override public Integer call() throws Exception {
                if (integer >= 100) {
                  throw new NullPointerException("test null exp");
                }
                return integer;
              }
            });
          }
        })
        .subscribe(new Consumer<Integer>() {
          @Override public void accept(Integer integer) throws Exception {
            Log.e(TAG, "test-->complete-" + integer);
          }
        }, new Consumer<Throwable>() {
          @Override public void accept(Throwable throwable) throws Exception {
            Log.e(TAG, "test-->error-" + throwable);
          }
        });

The following output:

  test-->complete-1
  test-->complete-2
  test-->complete-3
  test-->complete-23
  test-->complete-32
  test-->error-io.reactivex.exceptions.CompositeException: 4 exceptions occurred. 

The following code:

    Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
        .concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
          @Override public Flowable<Integer> apply(final Integer integer) throws Exception {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
              @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                if (integer >= 100) {
                  throw new NullPointerException("test null exp");
                }
                emitter.onNext(integer);
                emitter.onComplete();
              }
            }, BackpressureStrategy.BUFFER);
          }
        })
        .subscribe(new Consumer<Integer>() {
          @Override public void accept(Integer integer) throws Exception {
            Log.e(TAG, "test-->complete-" + integer);
          }
        }, new Consumer<Throwable>() {
          @Override public void accept(Throwable throwable) throws Exception {
            Log.e(TAG, "test-->error-" + throwable);
          }
        });

The following output:

  test-->complete-1
  test-->complete-2
  test-->complete-3
  test-->complete-23
  test-->complete-32
  test-->error-io.reactivex.exceptions.CompositeException: 4 exceptions occurred. 

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions