Skip to content

Unsubscribed observer receives event #3850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
alosich opened this issue Apr 11, 2016 · 7 comments
Closed

Unsubscribed observer receives event #3850

alosich opened this issue Apr 11, 2016 · 7 comments
Labels

Comments

@alosich
Copy link

alosich commented Apr 11, 2016

This code:

import rx.Observable;
import rx.Subscription;
import rx.subjects.PublishSubject;

final class Unsubscribing {

   private static Subscription mFirst;
   private static Subscription mSecond;

   public static void main(final String[] args) {
      final PublishSubject<Integer> publisher = PublishSubject.create();

      mFirst = publisher.subscribe(i -> {
            mSecond.unsubscribe();
            log("first: " + i);
         } );

      mSecond = publisher.subscribe(i -> {
            log("second: " + i);
         } );


      publisher.onNext(1);
      publisher.onNext(2);
   }


   private static void log(final String msg) {
      System.out.println(msg);
   }

}

Unsubscribes second subscription in the first callback. At the time second callback called it is not subscribed (unsubscribe call completed for it).

It's expected that second callback will not be called, but it is.

Output of that program is

first: 1
second: 1
first: 2
@alosich alosich changed the title Unsubscribed observer subscribes event. Unsubscribed observer receives event Apr 11, 2016
@akarnokd
Copy link
Member

This is due to the best-effort way of unsubscribing: PublishSubject, to remain low overhead as possible, doesn't check isUnsubscribed before calling onNext on a child and the dispatch loop works on a snapshot of the available subscribers. Therefore, if the first unsubscribes the second, that is not becoming visible to PublishSubject.onNext but only the next time.

@akarnokd
Copy link
Member

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@alosich
Copy link
Author

alosich commented Jul 21, 2016

@akarnokd isn't from point of view of observer pattern, which is rx based on, this situation is incorrect?

I mean "subscribed" and "unsubscribed" states from abstraction's point of view differs only by fact if observer continues to receive events. "Subscribed" == "receives events", "unsubscribed" == "not receives events".

Is this something to be fixed, or user code needs additional checks to ensure that unsubscribed subscriber will not get events?

@akarnokd
Copy link
Member

Yes, you should check mSecond.isUnsubscribed() in this case.

@alosich
Copy link
Author

alosich commented Jul 22, 2016

@akarnokd as a consequence this makes "unsubscribe" method non trustworthy.

Every time this method is used we need additinally check if subscriber was unsubscrubed when receiving events.

Imagine next code:

public final class SomeMechanics {
    private final Subscription mEventSubscription;

    public SomeMechanics(final Observable<Event> eventStream) {
        mEventSubscription = eventStream.subscribe(/* ... */);
    } 

    public void stop() {
        mEventSubscription.unsubscribe();
    } 
} 

This code must check if mEventSubscription is unsubscribed because it can't guarantee the conditions "stop" called from.

public final class SomeMechanics {
    private final Subscription mEventSubscription;

    public SomeMechanics(final Observable<Event> eventStream) {
        mEventSubscription 
            = eventStream.subscribe(onNext -> {
                    if (mEventSubscription.isUnsubscribed()) return;

                    // do stuff
                } );
    } 

    public void stop() {
        mEventSubscription.unsubscribe();
    } 
}

Moreover, that behavior is in contradictory in the description of "unsubscribe" method from documentation: http://reactivex.io/RxJava/javadoc/rx/Subscription.html#unsubscribe()

@akarnokd
Copy link
Member

See #4225 for an update to PublishSubject which includes a fix for this case.

@alosich
Copy link
Author

alosich commented Jul 22, 2016

@akarnokd thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants