Skip to content

Opening closing #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

Opening closing #546

wants to merge 3 commits into from

Conversation

akarnokd
Copy link
Member

Issue #540

Removed Opening and Closing interfaces and updated method signatures accordingly.
Moved ObserverBase to rx.operators for now and added static methods to wrap or construct one. If usefull, it and other default implementations may be moved to a new public rx.observers package.

@cloudbees-pull-request-builder

RxJava-pull-requests #478 SUCCESS
This pull request looks good

@headinthebox
Copy link
Contributor

Nice improvement.

* Implements an observer that ensures proper event delivery
* semantics to its abstract onXxxxCore methods.
*/
public abstract class ObserverBase<T> implements Observer<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to replicate SafeObserver functionality: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java?source=cc

I tend to prefer not having it use inheritance as it's confusing to extend from this and then need to implement other method names than a normal Observer, otherwise the methods could be over-written by copy/paste and defeat the purpose. Thus I prefer composition, and it fits the Rx model better which uses create methods and other such things rather than inheritance.

This logic also is wanted: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java?source=cc#L103

This however is possibly a bug: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java?source=cc#L79

This may be a misinterpretation of Rx Design Guideline 4.3, but note the bold text in the following:

4.3. Assume resources are cleaned up after an OnError or OnCompleted message
Paragraph 4.1 states that no more messages should arrive after an OnError or OnCompleted message. This makes it possible to cleanup any resource used by the subscription the moment an OnError or OnCompleted arrives. Cleaning up resources immediately will make sure that any side-effect occurs in a predictable fashion. It also makes sure that the runtime can reclaim these resources.

In this sample the Using operator creates a resource that will be disposed upon unsubscription. The Rx contract for cleanup ensures that unsubscription will be called automatically once an OnError or OnCompleted message is sent.

And we can provide better constructors and factory methods: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java?source=cc#L64

Is this intended to be part of the public API or just for internal operators? Everything inside rx.operators is considered implementation details and not part of the public API. Note how the Javadoc excludes the rx.operators package: http://netflix.github.io/RxJava/javadoc/

@headinthebox I would appreciate your guidance on this.

@headinthebox
Copy link
Contributor

I noticed the duplication as well in one of the earlier comments.

Here is a bit of history form .NET. For a long time we promoted the fact that the way to create observers was via Observer.create. However, we found that many people did not understand this and we saw over and over again that people were implementing IObserver by hand, and breaking the contract (similar to what we saw with Subjects). In fact this even leaked into the MSDN docs at some point :-( (hope they have been removed).

So what we did is expose ObserverBase, which is what Observer.create is implemented in terms internally such that in case people do want to inherit from Observer, they would use this base class. The price is that you have to override the xxxCore methods, since the onXXX methods call those and wrap them in logic to maintain the contract.
(Note that .NET has ObservableBase for the same reason).

Technically, there may actually be good reasons to implement Observer using a class since you may want to share state between the methods.

So both of you are correct, this sounds like a cop-out, but it is really just bowing to reality.

Hope this helps,

Erik

@benjchristensen
Copy link
Member

The way we have solved it in RxJava is that we wrap Observer implementations passed in to Observable.subscribe when those implementations are not from the internal RxJava implementation itself.

Thus, I don't see the need to force users to use Observer.create or extend from ObserverBase. We retain the control of wrapping Observer implementations when we feel the need to and can change that implementation whenever we choose. It is simpler and doesn't leak those details into the public API.

@akarnokd
Copy link
Member Author

akarnokd commented Dec 5, 2013

In this case, I'd rather keep the ObserverBase in rx.operators package and not "advertise" it. JoinObserver1 needs to provide a specific API beyond Observer so wrapping it in a SafeObserver is not an option.

@headinthebox
Copy link
Contributor

@akarnokd Could you subclass SafeObserver? to minimize code duplication?

@benjchristensen Makes sense, as long as everywhere an external observer comes in, we are careful to see if it needs to be protected. IntellIj tells me that Observer appears +/- 300 times in a parameter position. Maybe this is a nice "grab it" issue we can add to the list. Probably most them them are OK, but just in case.

@akarnokd
Copy link
Member Author

akarnokd commented Dec 6, 2013

I'll split this into two PRs, one for the removal of the Opening and Closing, one for refactoring the ObserverBase.

@akarnokd akarnokd closed this Dec 6, 2013
@akarnokd
Copy link
Member Author

akarnokd commented Dec 6, 2013

@headinthebox I've tried it but it ended up a more convoluted solution. The core functionality became an inner class and the outer class needs to forward the API calls to this internal class. Basically, most of the behavior of SafeObserver had to be subverted.

public final class JoinObserver1<T> extends SafeObserver<T> implements JoinObserver {
    public static <T> JoinObserver1<T> create(Observable<T> source, Action1<Throwable> onError) {
        ObserverBaseCore<T> base = new ObserverBaseCore<T>(source, onError);
        return new JoinObserver1<T>(base);
    }
    private final ObserverBaseCore<T> base;
    private JoinObserver1(ObserverBaseCore<T> base) {
        super(new SafeObservableSubscription(Subscriptions.empty()), (Observer<T>)base);
        this.base = base;
    }
    public Queue<Notification<T>> queue() {
        return base.queue;
    }
    public void addActivePlan(ActivePlan0 activePlan) {
        base.activePlans.add(activePlan);
    }
    @Override
    public void subscribe(Object gate) {
        this.base.gate = gate;
        base.subscription.set(base.source.materialize().subscribe(base));
    }

    @Override
    public void dequeue() {
        base.queue.remove();
    }
    private static class ObserverBaseCore<T> implements Observer<Notification<T>> {
        private Object gate;
        private final Observable<T> source;
        private final Action1<Throwable> onError;
        private final List<ActivePlan0> activePlans;
        private final Queue<Notification<T>> queue;
        private final SingleAssignmentSubscription subscription;
        private volatile boolean done;
        public ObserverBaseCore(Observable<T> source, Action1<Throwable> onError) {
            this.source = source;
            this.onError = onError;
            queue = new LinkedList<Notification<T>>();
            subscription = new SingleAssignmentSubscription();
            activePlans = new ArrayList<ActivePlan0>();
        }
        @Override
        public void onNext(Notification<T> args) {
            synchronized (gate) {
                if (!done) {
                    if (args.isOnError()) {
                        onError.call(args.getThrowable());
                        return;
                    }
                    queue.add(args);

                    // remark: activePlans might change while iterating
                    for (ActivePlan0 a : new ArrayList<ActivePlan0>(activePlans)) {
                        a.match();
                    }
                }
            }
        }

        @Override
        public void onError(Throwable e) {
            // not expected
        }

        @Override
        public void onCompleted() {
            // not expected or ignored
        }        
    }
    @Override
    public void onError(Throwable e) {
        // not expected
    }

    @Override
    public void onCompleted() {
        // not expected or ignored
    }     
    void removeActivePlan(ActivePlan0 activePlan) {
        base.activePlans.remove(activePlan);
        if (base.activePlans.isEmpty()) {
            unsubscribe();
        }
    }

    @Override
    public void unsubscribe() {
        if (!base.done) {
            base.done = true;
            base.subscription.unsubscribe();
        }
    }

}

@akarnokd akarnokd deleted the OpeningClosing branch January 13, 2014 10:03
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants