Skip to content

Observable.using() diposeEagerly clarification. #4144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mfycheng opened this issue Jun 29, 2016 · 4 comments
Closed

Observable.using() diposeEagerly clarification. #4144

mfycheng opened this issue Jun 29, 2016 · 4 comments
Labels

Comments

@mfycheng
Copy link

From my understanding, if disposeEagerly == false when calling Observable.using(resourceFactory, observableFactory, diposeEagerly), then the disposeAction should not be called when a termination event occurs.

However, based on:

.doAfterTerminate(disposeOnceOnly);

it appears that it just changes whether or not disposeAction is called before or after the termination event.

Is my understanding correct? And if so, is this the desired behaviour? The documentation for Observable.using(resourceFactory, observableFactory) implies that disposeAction is is only called on unsubscribe.

@zsxwing
Copy link
Member

zsxwing commented Jun 29, 2016

it appears that it just changes whether or not disposeAction is called before or after the termination event.

This is correct.

And if so, is this the desired behaviour? The documentation for Observable.using(resourceFactory, observableFactory) implies that disposeAction is is only called on unsubscribe.

Yes. Remember that a termination event will trigger unsubscribe.

@mfycheng mfycheng reopened this Jun 29, 2016
@mfycheng
Copy link
Author

Using the example code:

Observable<Long> emitter = Observable.interval(2, TimeUnit.SECONDS)
  .limit(5)
  .doOnEach(value -> log.debug("emitted: {}, value))
  .switchMap(value -> {
      Observable<Long> obs = Observable.using(
          () -> value,
          Observable::just,
          result -> {
            log.debug("disposeAction({})", result);
          });

    return obs.doOnTerminate(() -> log.debug("doOnTerminate({})", value))
        .doOnUnsubscribe(() -> log.debug("doOnUnsubscribe({})", value));
    });

TestSubscriber<Long> subscriber = new TestSubscriber<>();
emitter.subscribe(subscriber);
subscriber.awaitTerminalEvent();

in RxJava < 1.1.6, the output is the following:

---
2170 [RxComputationScheduler-3] DEBUG com.mfycheng.App - emitted: 0
2177 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnTerminate(0)
---
4169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - emitted: 1
4169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnUnsubscribe(0)
4169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - disposeAction(0)
4169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnTerminate(1)
---
6169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - emitted: 2
6169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnUnsubscribe(1)
6169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - disposeAction(1)
6169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnTerminate(2)
---
8169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - emitted: 3
8169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnUnsubscribe(2)
8169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - disposeAction(2)
8169 [RxComputationScheduler-3] DEBUG com.mfycheng.App - doOnTerminate(3)

whereas the output for 1.1.6 is:

---
2160 [RxComputationScheduler-1] emitted: 0
2166 [RxComputationScheduler-1] doOnTerminate(0)
2166 [RxComputationScheduler-1] disposeAction(0)
---
4156 [RxComputationScheduler-1] emitted: 1
4156 [RxComputationScheduler-1] doOnUnsubscribe(0)
4156 [RxComputationScheduler-1] doOnTerminate(1)
4156 [RxComputationScheduler-1] disposeAction(1)
---
6158 [RxComputationScheduler-1] emitted: 2
6158 [RxComputationScheduler-1] doOnUnsubscribe(1)
6158 [RxComputationScheduler-1] doOnTerminate(2)
6158 [RxComputationScheduler-1] disposeAction(2)
---
8157 [RxComputationScheduler-1] emitted: 3
8157 [RxComputationScheduler-1] doOnUnsubscribe(2)
8157 [RxComputationScheduler-1] doOnTerminate(3)
8157 [RxComputationScheduler-1] disposeAction(3)

The primary difference being that the disposeAction() is called immediately in 1.1.6, when it used to be called when unsubscribe was called. The documentation implies that if disposeEagerly == false, then the disposeAction() will occur on unsubscribe, which in this case occurs at a different point in time

The use case for this is where a resource (created by Observable.using()) exists until a new value is emitted by the original observable, at which point the existing resource should be disposed of, and new one created. Upgrading to 1.1.6 breaks this as the resource gets disposed of immediately.

Is this a misuse of Observable.using()? Or should disposeEagerly == false actually delay calling the disposeAction() until the unsubscribe actually occurs.

@akarnokd akarnokd changed the title Question: Observable.using() diposeEagerly clarification. Observable.using() diposeEagerly clarification. Jun 29, 2016
@akarnokd
Copy link
Member

The latter behavior is the correct one and is the result of the bugfix in #3922. If the generated Observable terminates, the resources are released. Before that, the resource was kept open indefinitely even after its associated Observable was done.

@mfycheng
Copy link
Author

mfycheng commented Jun 29, 2016

I see. So in this specific case, if I wanted to delay the disposeAction() until unsubscribe (i.e. the behaviour of < 1.1.6), I should us an observable factory that never emits a terminating event? Something like:

Observable<Long> obs = Observable.using(
          () -> value,
          resource -> Observable.create(subscriber -> {
               subscriber.onNext(resource);
          }
          result -> {
            log.debug("disposeAction({})", result);
          });

or alternatively:

Observable<Long> obs = Observable.using(
          () -> value,
          resource -> Observable.just(resource).concatWith(Observable.never()),
          result -> {
            log.debug("disposeAction({})", result);
          });

It seems a little sketchy, but appears to achieve the desired behaviour

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants