Skip to content

Observable<Void> Usage or VoidObservable #3037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
NiteshKant opened this issue Jun 20, 2015 · 23 comments
Closed

Observable<Void> Usage or VoidObservable #3037

NiteshKant opened this issue Jun 20, 2015 · 23 comments

Comments

@NiteshKant
Copy link
Member

While working on RxNetty, a networking library that layers RxJava on top of netty, I have come across a common case of expressing result of an operation as an Observable<Void>. A typical case being the result of a write on a Connection. The API for write looks like:

public Observable<Void> write(Observable<T> msgs);

The return value just represents the asynchronous result of the write and hence does not emit any item, so has the type as Void.

This works pretty well, however, it becomes awkward when we start doing a bit more complex operations over this write. eg: If I am writing a TCP client that sends a message on the connection and then reads the response from the server, I start doing things like:

connection.write(Observable.just("hello"))
                 .cast(String.class)
                 .concatWith(connection.getInput())

(Actual example here)

In this example, connection.getInput() returns an Observable<T> emitting items as read on the connection.

and in case, I do not want to wait for write to complete, the code looks like:

connection.write(Observable.just("hello"))
                 .cast(String.class)
                 .mergeWith(connection.getInput())

What is awkward in the above example, is the cast(String.class). It is completely safe as the write never emits an item (Void) so I can merge/concat heterogeneous streams but it is just awkward.

My motivation behind creating this issue is to first see if other folks have come across the same scenario, if so, how have they handled it. Secondly, if this does sound like something worth spending time to make it better, I would like to propose some abstraction on the lines of a VoidObservable (for the lack of a better name) which basically provides better constructs around:

  • Concat with a different type: write().cast().concatWith() example above, where it is expected to continue with another stream of a different type (String in the above example)
  • Merge with a different type: write().cast().mergeWith() example above, where it is expected to eagerly subscribe to the second Observable but listen only for errors from the Observable<Void>
@akarnokd
Copy link
Member

I see two problems:

Type inference limitations

The lack of cast-down type-inference in the Java language. For example: if I have an Observable<Double> a and an Observable<Integer> b, I can't use mergeWith() because it expects the parameter to extend the type of the source, which Double and Integer doesn't do to each other. You'd expect c = a.mergeWith(b) have a common supertype of Observable<Number> which type conversion can't be expressed in Java:

interface IObservable<T> {
    <U, R super T & U> IObservable<R> mergeWith(IObservable<U> other);
}

Here, super is not allowed and gives a compiler error.

If I try to cheat this out via extends I have to add a second type parameter to the observable:

interface IObservable<T extends R, R> {
    void subscribe(Subscriber<R> child);
    default <U extends R> IObservable<R, R> mergeWith(IObservable<U, R> other) {
        return null;
    }
}

This requires me to specify some common basetype so the merge can fall back onto it, however, it has limitations too:

IObservable<Double, Number> io1 = s -> {};
IObservable<Integer, Number> io2 = s -> {};
IObservable<Integer, Object> io3 = s -> {};

io1.mergeWith(io2);
io2.mergeWith(io1);
io1.mergeWith(io3); // won't compile, incompatible types.

The only thing that works is the static merge:

Observable<? extends Number> merge = Observable.merge(o1, o2);

But still, it forces the user to have ? extends Number carried everywhere. Rx.NET falls into this category so it seems to be a no-issue there and due to declaration-site variance, you don't have super and extends everywhere.

Operator methods are final

Even if you have your own, fixed type observable, the following won't compile because our operator methods are final:

public static final class VoidObservable extends Observable<Void> {

    protected VoidObservable(OnSubscribe<Void> onSubscribe) {
        super(onSubscribe);
    }

    public <T> Observable<T> mergeWith(Observable<T> other) {
        return null;
    }
}

The compiler will most likely complain about same-erasure non-overriding methods or just simply about override attempt of a final method. You'd have to name this method something else and remember to use it instead of the standard.

@abersnaze
Copy link
Contributor

I took a crack at changing RxNetty to use Observable<?> but ran into generics problems when it came to its usage of onErrorResumeNext(Func1). Now that we have doOnError(Action1) that doesn't have to return Observable<?> it might work.

@NiteshKant
Copy link
Member Author

@akarnokd @abersnaze Thanks for your comments, let me answer them one at a time.

I see two problems:

You are absolutely correct in identifying these issues & I should have clarified my intent better. What I was intending to is to introduce a class like:

public class VoidObservable extends Observable<Void> {

    protected VoidObservable(OnSubscribe<Void> f) {
        super(f);
    }

    public <T> Observable<T> continueWith(Observable<T> next) {
        final Observable<T> cast = unsafeCast();
        return cast.concatWith(next);
    }

    public <T> Observable<T> mergeErrorWith(Observable<T> next) {
        final Observable<T> cast = unsafeCast();
        return cast.mergeWith(next);
    }

    private <T> Observable<T> unsafeCast() {
        @SuppressWarnings("rawtypes")
        final Observable rawSource = this;
        @SuppressWarnings("unchecked")
        final Observable<T> cast = rawSource;
        return cast;
    }
}

and add any other methods specific to VoidObservable. This is basically clarifying the intent of the operation better than .cast().XWith().

Do you see this working or making sense?

I took a crack at changing RxNetty to use Observable<?>

Can you elaborate more i.e. were you trying to make RxNetty's Connection.write() return Observable<?> ?

@abersnaze
Copy link
Contributor

I took a crack at changing RxNetty to use Observable<?>

Can you elaborate more i.e. were you trying to make RxNetty's Connection.write() return Observable<?> ?

The observer of the return value from Connection.write() only cares about the terminal state/time of that Observable and therefore shouldn't put any restrictions on the type of data that it is ultimately going to ignore. If there are no restriction on the type then there wouldn't be any need for casting it to Observable<Void>.

@NiteshKant
Copy link
Member Author

@abersnaze I agree with you on why <?> could be better in terms of indicating that the subscriber should not care about the type, although in this context, wouldn't it have the same problem? If you keep aside the RxNetty connection and look at this simple case,

        Observable<?> source = Observable.just("Hi");
        source.concatWith(Observable.just(1));

The above isn't valid as Observable<?> can not be concatenated with Observable<Integer> in this case. So, you will have to do:

        Observable<?> source = Observable.just("Hi");
        source.cast(Integer.class).concatWith(Observable.just(1));

which is the same thing as Observable<Void>, correct?

@abersnaze
Copy link
Contributor

You wouldn't use concat because it doesn't make sense. Concat is concatenating the data you don't care about the data you are only using concat because enforces an order and propagation of errors. Here is code that does what you want.

Observable<?> source = Observable.just("Hi");
source.lift(new Operator<Integer, Object>() {
    @Override
    public Subscriber<? super Object> call(Subscriber<? super Integer> child) {
        return new Subscriber<Object>(child) {
            @Override
            public void onCompleted() {
                Observable.just(1).subscribe(child);
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onNext(Object t) {
                // ignore data
            }
        };
    }
});

@NiteshKant
Copy link
Member Author

Sure, I think the point here is whether using <?> is any better than <Void>, I do not really see a difference in terms of applying any higher order functions on the Observable, which is the context for this discussion.

Now, if I take out <?> argument, I think the problem still is similar i.e. how to make the use case of continueWith and mergeErrorWith, I outlined above, more palatable to the users. Whether the actual implementation of continueWith is .cast().concatWith() or a specific operator like @abersnaze mentioned can be discussed later.

@benjchristensen
Copy link
Member

I don't think VoidObservable will work very well due to how generics behave. I can't see how it would make concatWith behave any better.

I think something like continueWith would work better, which by contract is saying that it ignores the generics of the previous stream and continues with the next stream onCompletion of the previous.

Observable<Void> someStream ...
Observable<T> otherStream ...
Observable<T> combined = someStream.continueWith(otherStream);

This achieves the goal of ignoring output (a 'Void' or ignoreElements()), ignoring the type (such as via cast(Class.class), and concatting when onComplete emits while also emitting onError.

@NiteshKant
Copy link
Member Author

I see, so what you are saying is add .continueWith and possible mergeError in Observable instead of creating an extension like VoidObservable.

The only ambiguity will be, what if, someone uses it with a non-void Observable? We can possibly do something like: raising an error if the first stream emits any element.

I am fine with either approaches, if and when we have a consensus here, I can submit a PR for this change!

@benjchristensen
Copy link
Member

Yes, it would behave like single() which emits an error if it emits an onNext.

@abersnaze
Copy link
Contributor

I'm okay with that restriction because if someone wanted to use the continueWith (I like switchEmpty more) but wanted to ensure the error doesn't happen then they can compose it with ignoreElements.

NiteshKant pushed a commit to NiteshKant/RxJava that referenced this issue Jul 2, 2015
As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.
@benjchristensen
Copy link
Member

Based on hallway discussions about this, we considered renaming for clarity to concatEmptyWith and mergeEmptyWith so we clearly describe the concat and merge cases.

Do you still prefer continueWith and mergeError?

@benjchristensen benjchristensen changed the title Case for a VoidObservable? Observable<Void> Usage or VoidObservable Jul 14, 2015
@NiteshKant
Copy link
Member Author

I updated the code In the PR to have the methods as concatEmptyWith and mergeEmptyWith but did not update the title of the PR.

@tonypiazza
Copy link

After wrestling with the issue of how to convert Observable<T> into Observable<Void>, I settled on using the compose method. While not ideal in terms of communicating intent, it does solve the problem in a simple way:

return asyncBucket
  .remove(doc)   // emits Observable<JsonDocument>
  .compose(new Transformer<JsonDocument, Void>() {
    @Override
    public Observable<Void> call(Observable<JsonDocument> removed) {
      return Observable.empty();
    }
});

@roman-mazur
Copy link

Well, @tonypiazza, it seems to me that if your asyncBucket.remove(doc) is a cold observable, your operation will never be performed since nobody subscribes to this observable. Do I miss something?
I used concatMap { Observable.<Void>empty(); } when I wanted to convert Observable to Observable.

@abersnaze
Copy link
Contributor

@tonypiazza, I believe @roman-mazur is right. By adding the concatEmptyWith and mergeEmptyWith to Observable we'll give the operators that RxNetty needs to allow @tonypiazza to return Observable<JsonDocument>.

@tonypiazza
Copy link

@roman-mazur and @abersnaze Thanks for your replies. I stand corrected. I tested my earlier solution and discovered that the document was not actually getting removed. By changing the code to use concatMap as suggested by @roman-mazur, it now works as expected. Here is the updated code that works:

return asyncBucket
  .remove(doc)   // emits Observable<JsonDocument>
  .concatMap(new Func1<JsonDocument, Void>() {
    @Override
    public Observable<Void> call(Observable<JsonDocument> removed) {
      return Observable.empty();
    }
});

So am I correct that the introduction of concatEmptyWith and mergeEmptyWith would allow me to do the following instead?

return asyncBucket
  .remove(doc)
  .concatEmptyWith(Observable.<Void>empty());

Let me know if my understanding is incorrect.

Thanks to everyone for a very helpful discussion.

@abersnaze
Copy link
Contributor

More like return asyncBucket.remove(doc).ignoreElements();

@tonypiazza
Copy link

@abersnaze The problem with ignoreElements() is that it doesn't change the return type. I want to return Observable<Void>.

NiteshKant pushed a commit to NiteshKant/RxJava that referenced this issue Jul 15, 2015
As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.

Review comments
@NiteshKant
Copy link
Member Author

So am I correct that the introduction of concatEmptyWith and mergeEmptyWith would allow me to do the following instead?

These new operators are intended to be used on an empty Observable i.e. the one that never emits an item. Typically such an Observable is Observable<Void>. However, in your case, you are trying to apply these operators on an Observable<JsonDocument>, which will result in an error emitted by these operators.

If you want to change the type of an existing Observable that is not empty, the available *map() functions are the way to go as pointed by @roman-mazur

NiteshKant pushed a commit to NiteshKant/RxJava that referenced this issue Oct 9, 2015
As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.
NiteshKant pushed a commit to NiteshKant/RxJava that referenced this issue Oct 9, 2015
As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.
@ibaca
Copy link

ibaca commented Oct 16, 2015

What do you think about this names? (I prefer not to force emptiness as commented here #3430)

then similar to promises 'then', second merged after first completed

connection.write(0x1234).cast(String.class).concatWith(connection.getInput())
connection.write(0x1234).then(connection.getInput())
<V> Observable<V> then(Observable<V>> v1) { return ((Observable<V>) ignoreElements()).concatWith(v1); }

flat as flatMap but ignore elements, so nothing to map.

connection.write(0x1234).cast(String.class).mergeWith(connection.getInput())
connection.write(0x1234).flat(connection.getInput())
<V> Observable<V> flat(Observable<V> v1) { return ((Observable<V>) ignoreElements()).mergeWith(v1); }

none as single but assert emptiness

*note: I think that concatWith is not working as expected in this talk because it subscribe to the first two elements (means, v1 is immediately subscribed), so you need to wrap the v1 into a defer(). I was wrong.

@akarnokd
Copy link
Member

*note: I think that concatWith is not working as expected in this talk because it subscribe to the first two elements (means, v1 is immediately subscribed), so you need to wrap the v1 into a defer().

Not sure what you mean by that. Concat has an internal buffer for two Observables but only subscribes to one of them. It requests 2 Observables from upstream and once the first completed, it requests another and also subscribes to the second Observable. In theory, this should establish a pipeline of Observables where there is one ready as soon as the previous one completes.

@NiteshKant
Copy link
Member Author

As mentioned in the PR, IMO, Completable is the correct abstraction for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants