Skip to content

1.x: Merging an observable of singles. #4988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

abersnaze
Copy link
Contributor

The addition of a Observable<Single<T>> -> Observable<T> to round out the basic API of rx.Single. I need this for doing a flat scan of sorts.

Single.merge(events.scan(Single.just(seed), (stateSingle, event) -> {
    return stateSingle.flatMap((state) -> {
        return state.write(event);
    }).cache();
}));

@codecov-io
Copy link

codecov-io commented Jan 12, 2017

Current coverage is 83.54% (diff: 57.57%)

Merging #4988 into 1.x will decrease coverage by 0.73%

@@                1.x      #4988   diff @@
==========================================
  Files           288        289     +1   
  Lines         17806      18235   +429   
  Methods           0          0          
  Messages          0          0          
  Branches       2698       2782    +84   
==========================================
+ Hits          15007      15234   +227   
- Misses         1950       2096   +146   
- Partials        849        905    +56   

Powered by Codecov. Last update 3716747...1008acd

@davidmoten
Copy link
Collaborator

Looks like a big copy and paste from an existing merge operator. Can you summarize the differences from the original tried and true operator to help review? I also wonder if we can share a decent chunk of code between operators.

@abersnaze
Copy link
Contributor Author

It was a copy for the most part but for changing the class names. It could have be done by mapping the Observable<Single<T>> to an Observable<Observable<T>>.

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse the existing merge operator.

* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Single<T>> source) {
return source.lift(OperatorMergeSingle.<T> instance(false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be

return source.map(new Func1<Single<T>, Observable<T>() {
    @Override public Observable<T> call(Single<T> t) {
        return t.toObservable();
    }
}).lift(OperatorMerge.<T>instance(false));

* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Single<T>> source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Single<T>> source, int maxConcurrent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<T>> source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<T>> source, int maxConcurrent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

@abersnaze
Copy link
Contributor Author

@akarnokd actually do you think that the back pressure could be streamlined because of the guarantee that they'll be one onNext for each subscription? Does it even need a custom Producer if we got rid of the maxConcurrent overloads?

@akarnokd
Copy link
Member

The v2 version is optimized.

@akarnokd
Copy link
Member

if we got rid of the maxConcurrent overloads

You may not want to have a million outstanding Single-based network request. Limiting flatMap's concurrency is expected here where backpressure helps naturally.

@akarnokd akarnokd added this to the 1.3 milestone Jan 17, 2017
@akarnokd akarnokd changed the title Merging an observable of singles. 1.x: Merging an observable of singles. Jan 19, 2017
@akarnokd
Copy link
Member

Any progress on this?

@akarnokd
Copy link
Member

Closing via #5092.

@akarnokd akarnokd closed this Feb 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants