Skip to content

1.x: fix unsubscription and producer issues in sample(other) #3658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 2, 2016

Conversation

akarnokd
Copy link
Member

This PR fixes 2 bugs with sample

  • Termination of the main or sampler subscriber unsubscribed the child subscriber which is not allowed.
  • The sampler wrapped the child subscriber and thus it allowed setting a producer on the child (thus sampling based on request with some sources).

In addition, #3657 wants to emit the very last item on completion to which I marked the required changes in comments (to be uncommented in a separate PR if needed).

}

@Override
public void onCompleted() {
// no need to null check, main is assigned before any of the two gets subscribed
main.get().unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the order of unsubscribe and onCompleted? Is it possible that s.onCompleted uses some resource in main and that will be destroyed in unsubscribe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main and child are on different subscription container so child can't use anything from main. But if you think otherwise, please write a unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The child subscriber may store some resource coming from main and use it in onCompleted, but main will close it in its subscription. E.g.,

   @Test
    public void foo() {
        Observable<InputStream> source = Observable.create(new OnSubscribe<InputStream>() {
            @Override
            public void call(Subscriber<? super InputStream> subscriber) {
                try {
                    final InputStream input = new URL("http://www.google.com").openStream();
                    subscriber.add(Subscriptions.create(new Action0() {
                        @Override
                        public void call() {
                            try {
                                input.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }));
                    Thread.sleep(1000);
                    subscriber.onNext(input);
                    Thread.sleep(1000);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
        source.sample(Observable.interval(1500, 100, TimeUnit.MILLISECONDS).take(2)).subscribe(new Subscriber<InputStream>() {

            private InputStream lastInputStream;

            @Override
            public void onCompleted() {
                if (lastInputStream != null) {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(lastInputStream));
                    String line = null;
                    try {
                        while ((line = reader.readLine()) != null) {
                            System.out.println(line);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(InputStream inputStream) {
                lastInputStream = inputStream;
            }
        });
    }

@akarnokd akarnokd force-pushed the SampleOtherMoreFixes1x branch from 1a1ba81 to 4369e1c Compare February 2, 2016 08:53
@akarnokd
Copy link
Member Author

akarnokd commented Feb 2, 2016

Restored original completion-unsubscription order + verification

@zsxwing
Copy link
Member

zsxwing commented Feb 2, 2016

👍

1 similar comment
@artem-zinnatullin
Copy link
Contributor

👍

akarnokd added a commit that referenced this pull request Feb 2, 2016
1.x: fix unsubscription and producer issues in sample(other)
@akarnokd akarnokd merged commit f4be307 into ReactiveX:1.x Feb 2, 2016
@akarnokd akarnokd deleted the SampleOtherMoreFixes1x branch March 14, 2016 01:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants