Skip to content

"Gate" operator #4210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ghost opened this issue Jul 18, 2016 · 8 comments
Closed

"Gate" operator #4210

ghost opened this issue Jul 18, 2016 · 8 comments
Labels

Comments

@ghost
Copy link

ghost commented Jul 18, 2016

I've written a custom RxJava operator and wanted to check whether this was the best way to do what I want and also to make sure it is implemented correctly. Code is here.

My use case for this operator is an Android RecyclerView backed by an Observable, in my case the Observable reads from the local database and might do some network calls. The issue is a user could have hundreds of items produced by this Observable, and each one might require its own network call. Since I don't want to make hundreds of network calls at once, I want the RecyclerView to lazily load the values. Think an inifinitely-scrolling RecyclerView.

What this operator does is it takes an Observable<Long> and whenever that produces a value it will request that many values from the Producer for the actual Observable. For example:

class RecyclerViewGate {
  PublishRelay<Long> relay = PublishRelay.create();
  AtomicBoolean isLoadingBatch = new AtomicBoolean(false);
}


RecyclerViewGate gate = new RecyclerViewGate();


readFromDatabase()
    .buffer(batchSize)
    .lift(GateObservable.blockUsingGate(() -> gate.relay.startWith(1L)))
    .doOnNext(x -> gate.isLoadingBatch.set(true))
    .flatMap(doNetworkRequests())
    .doOnNext(x -> gate.isLoadingBatch.set(false))
    .subscribe(x -> adapter.addItems(x));


void onRecyclerViewScrolled() {
  if (!gate.isLoadingBatch.get() && getLastVisibleItemPosition() + bufferSize > adapter.getItemCount()) {
    gate.relay.call(1L);
  }
}

My implementation seems to work, but I would like an RxJava expert's opinion on whether it's correct (or even if it's a sensible approach) since I've never written an operator before.

@akarnokd
Copy link
Member

Looks interesting, here are my thoughts:

  • L58 Try to avoid subscribing in the constructor; use unsafeSubscribe while you are at it
  • L67 a terminal event from the gate may still race with an onNext event from main, you have to wrap the child Subscriber into SerializedSubscriber just before L41. Then you don't need to CAS into the completed state either.
  • L104 you don't seem to use the value of the pendingRequest
  • L114 this works but actually, you don't need an arbiter just expose the main subscriber's request method as requestGate, then have the GatedProducer reference back the main subscriber and where you wrote arbiter.request, write parent.requestGate(toRequest). In this case, you don't a have to override the main's setProducer either.
  • L117 and L136 look quite intriguing but looks okay
  • generally avoid private methods and classes, saves you a bunch of hidden bridge methods, especially on Android.

I've never written an operator before

Actually, it's a nice job for a first-time writer.

or even if it's a sensible approach

We don't have any operators that let you do this in-sequence request manipulation.

@abersnaze
Copy link
Contributor

I've been interested in high level operators that allow the use of Observable APIs to enable manipulation of behavior rather then data of other Observables. For example the here are a few of the things that I've been trying to work on:

  • retryWhen & repeatWhen that control re-subscription. It needs better documentation and bunch of recipes that users can use to get the behavior they are looking for.
  • more recently Scheduler.when that controls when async tasks are run.
  • I also wrote up valve 1.x: Add an operator to throttle data via controlling the requests going upstream #3781 that haven't had time to get back to. This one seems most like what you're trying to do here.

This seem interesting but I'm wondering if the API could be implemented as an Observable chain to allow others to come up with their own behaviors by composing standard operators.

@ghost
Copy link
Author

ghost commented Jul 18, 2016

@akarnokd I've updated the gist with your suggestions.

L58 Try to avoid subscribing in the constructor; use unsafeSubscribe while you are at it

Done.

L67 a terminal event from the gate may still race with an onNext event from main, you have to wrap the child Subscriber into SerializedSubscriber just before L41. Then you don't need to CAS into the completed state either.

I've wrapped the child with SerializedSubscriber, but I'm not sure what you mean about not needing to CAS. Do you mean I no longer need to use an AtomicBoolean? Or do you mean I could do

public void onCompleted() {
  if (!completed.get()) {
    completed.set(false);
    child.onCompleted();
  }
}

in which case compareAndSet looks nicer, so unless there's a performance improvement I prefer compareAndSet.

L104 you don't seem to use the value of the pendingRequest

AtomicLong doesn't have just an add method, addAndGet is the best alternative

L114 this works but actually, you don't need an arbiter just expose the main subscriber's request method as requestGate, then have the GatedProducer reference back the main subscriber and where you wrote arbiter.request, write parent.requestGate(toRequest). In this case, you don't a have to override the main's setProducer either.

Done.

L117 and L136 look quite intriguing but looks okay

Do you want more explanation on what's going on? Basically the AllowedRequests object stores pending state, i.e. either how many requested or how many allowed requests are left. I couldn't think of better names. I based it on how take limits the number of requests.

generally avoid private methods and classes, saves you a bunch of hidden bridge methods, especially on Android.

Nice tip, thanks!

@akarnokd
Copy link
Member

but I'm not sure what you mean about not needing to CAS

public void onCompleted() {
    child.onCompleted();
}

Do you want more explanation on what's going on?

No I get it.

@ghost
Copy link
Author

ghost commented Jul 19, 2016

@abersnaze

retryWhen & repeatWhen that control re-subscription. It needs better documentation and bunch of recipes that users can use to get the behavior they are looking for.

I don't think that would work in my case since I'm not resubscribing to Observables. I probably could adapt an Observable to work like that, but it wouldn't be as clear.

more recently Scheduler.when that controls when async tasks are run.

That could work, something like

readFromDatabase()
    .observeOn(Schedulers.io().when(runTasksBasedOnRecyclerView()))
    ...

I also wrote up valve #3781 that haven't had time to get back to. This one seems most like what you're trying to do here.

That does look similar. Something like this:

PublishRelay<Boolean> relay = PublishRelay.create();

readFromDatabase()
    .pressureValve(relay, 1)
    .doOnNext(x -> relay.call(false))
    ...

void loadMore() {
  relay.call(true);
}

I haven't tested either of these methods yet, it seems pressureValve isn't part of a stable release yet and when is in 1.1.7 but we're currently using 1.1.6 and I don't have time to update at the moment. Looks promising though.

This seem interesting but I'm wondering if the API could be implemented as an Observable chain to allow others to come up with their own behaviors by composing standard operators.

I did actually have a version using zip where I zipped with an Observable whose events were ignored but would trigger loading the next main event.

@ghost
Copy link
Author

ghost commented Jul 19, 2016

Closing since I'm happy with the implementation, thanks for all the help.

@ghost ghost closed this as completed Jul 19, 2016
@davidmoten
Copy link
Collaborator

@abersnaze

For example the here are a few of the things that I've been trying to work on:
retryWhen & repeatWhen that control re-subscription. It needs better documentation and bunch of recipes that users can use to get the behavior they are looking for.

I've also found retryWhen a bit low-level to use so rxjava-extras has a RetryWhen builder. I think rxjava core users could benefit from a similar approach so they can discover the behaviour they want a bit more readily. Couchbase also has a RetryWhen builder I believe. Just thought I'd mention it.

@abersnaze
Copy link
Contributor

Sorry @GrahamRogers, I wasn't trying to suggest that you use retryWhen or Scheduler.when. I was trying to express a theme that I've been following of operators that use observables for the composition behavior and not for the processing data. I was wondering if gate could follow that theme?

@davidmoten That was one of the steps along the way to the development of "getting a more ways to configure retrying":

  1. do we add a bunch of retry overloads to Observable. (too inflexible and we'd be constantly adding more)
  2. do we make a builder of behavior. (Observable is a builder of behavior)
  3. retry and repeat both manipulate re-subscriptions when the source Observable stops.

That is how OnSubscribeRedo came to be the mother of all subscription mangling. I've been thinking that that now that we have Single and Completable it might be easier to constrain what is expected of the function.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants