Skip to content

2.x: resilience in the parallel() operators #5128

@akarnokd

Description

@akarnokd

When the architecture of the parallel() subsystem was designed, the goal was to provide means for efficient parallel computations over recovering from failures in any or most of the parallel 'rails' in it. This resulted fail-fast implementation and little to no means (beyond flatMap) to prevent the death of the whole parallel computation due to a single failing component.

Adding the usual onErrorResumeNext or retry doesn't work because the core driver, parallel() can't support resubscription on its individual rails and even if, a failure behind a runOn almost certainly would drop any unprocessed elements in its queue.

The given toolset, ParallelFlowable already gives the opportunity to include resilience at the cost of overhead: flatMap and/or concatMap. As with the traditional flows, the inner Flowables to these can have all sorts of operators not available on parallel() itself and are confined to the particular 'rail', thus retry()/onErrorResumeNext can be applied on a per item basis:

Flowable.range(1, 100)
.parallel(20)
.runOn(Schedulers.io())
.concatMap(index -> api.getImage(index).onErrorResumeNext(Flowable.just(notAvailableImage)))
.sequential()
.observeOn(guiThread)
.subscribe(...)

On the other hand, parallel-map/doOnNext (among others) don't offer any ignore/resume option out of box and unless the developer applies try-catch to prevent crashing the 'rail'.

To enable finer gained control in this situation, I propose the following overloads:

  • enum FailureHandling: stop, error, skip, retry
  • map(Function<T, R>, FailureHandling)
  • map(Function<T, R>, Function<Throwable, FailureHandling>)
  • filter(Predicate<T>, FailureHandling)
  • filter(Predicate<T>, Function<Throwable, FailureHandling>)
  • doOnNext(Action<T>, FailureHandling)
  • doOnNext(Action<T>, Function<Throwable, FailureHandling>)

They should cover the the cases where the Function hosts a blocking call (if there was a need for timed backoff, the concatMap approach is still available).

Thoughts?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions