Skip to content

Conversation

@akarnokd
Copy link
Member

@akarnokd akarnokd commented May 25, 2016

This PR removes the immediate() scheduler "optimization" from observeOn and treats it as a common scheduler. Since observeOn has a stable request pattern, this turns it into a rebatching operator. No matter what the downstream requests, the upstream will requests of the specified size (with 25% as low water mark; i.e., replenishment after 75%). Since immediate is synchronous, this will run the drain loop, non-reentrant, on the caller thread.

I found this mode of operation very handy in my Reactive-RPC prototype and a simple streaming echo RPC call; it prevents going unbounded and bloating the message sender threads:

// remote
public Observable<Integer> echo(RpcStreamContext<?> ctx, Observable<Integer> in) {
   return in.observeOn(Schedulers.immediate(), 16);
}

// client
api.echo(Observable.range(1, 100_000)).observeOn(Schedulers.immediate(), 32)
.subscribe(System.out::println, Throwable::printStackTrace);

@davidmoten
Copy link
Collaborator

Great! What about adding a method to Observable that calls it so this functionality is discoverable? Say Observable.batchRequests(n)?

@akarnokd
Copy link
Member Author

That could be a follow-up PR if this gets accepted.

@zsxwing
Copy link
Member

zsxwing commented May 30, 2016

Why not just adding a batchRequests operator instead of changing the existing behavior? This probably affects some users' applications if they depends on the optimization.

Secondly, using observeOn(Schedulers.immediate(), 16) and assuming how it implements is not reliable. This is not an API contract and it could be changed in any time.

@akarnokd
Copy link
Member Author

The alternative requires duplicating code, which triggers more outcry usually...

@zsxwing
Copy link
Member

zsxwing commented May 31, 2016

I think you can add a flag (e.g., disallowOptimization) to OperatorObserveOn and use it for batchRequests.

@akarnokd akarnokd closed this May 31, 2016
@akarnokd akarnokd deleted the ObserveOnRebatcher branch June 28, 2016 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants