Skip to content

1.x: Upgrading AsyncOnSubscribe from Experimental to Beta #3817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

stealthcode
Copy link

No description provided.

@stealthcode
Copy link
Author

see #3780 for context.

@stealthcode
Copy link
Author

The goals for the AsyncOnSubscribe was motivated in part by asynchronous paginated streaming processes. An example is fulfilling a stream of data via RPC with a minimal amount of buffering on the consuming client. Buffering operators (such as observeOn) will issue subsequent requests upstream and when the algorithms governing these semantics are inefficient (i.e. request 1 - fulfill 1 loops) the AsyncOnSubscribe may be called multiple times.

However sever or trivial this impact, I do not see this as a problem with AsyncOnSubscribe. That is a bug with the buffering operators that needs to be fixed. Any observable which fulfills request from an async process (i.e. any RPC) will be impacted by this inefficient request behavior. This will necessitate user implementations which buffer and/or reorder the stream of data.

The AsyncOnSubscribe is being used inside Netflix in such a way as described above by @kurzweil (see comment). I see that this functionality provides a safe way to get around the problems of inefficient request semantics and once these problems are resolved this a good API to stream asynchronous data for precisely the amount requested by the consumer.

@akarnokd you are the committer who has been most vocally against this upgrade. Can you please post your concerns with the API?

@akarnokd
Copy link
Member

You want to rely on the request pattern which now has to be changed almost everywhere and become part of the public API to satisfy this operator. Any third party lifted operator and the whole thing may be broken again. You need tight control of the request amount which only happens reliably with a same-thread consumer (so no observeOn, etc.).

In addition, the operator can't ensure the specified amount is honored by the source it generates (i.e., last page may shorter than the page size); overproduction can shift the whole output-paging and underproduction triggers a new round to generate the missing amount. The only alternative for handling bad sources is to signal error in either case. (Btw., @kurzweil 's example doesn't use the requested parameter with the response).

You quoted RPC and paging as the main target for AsyncOnSubscribe. RPC requires bi-di streaming and paging requires streaming+backpressure over a network boundary, both beyond RxJava.

@stealthcode
Copy link
Author

None of your points has anything to do with the API of AsyncOnSubscribe. Those problems exist. But they do not impact whether we have the correct API for the use case. That is why this should be upgraded and the usage of request(1) should be improved.

RPC requires bi-di streaming and paging requires streaming+backpressure over a network boundary, both beyond RxJava.

The purpose of the AsyncOnSubscribe is to provide a safe way for a consumer of an RPC to stream data. The goal is not to provide streaming over network boundaries. That is the goal of ReactiveSocket.

So I'm still at a loss for why you argue against an API that is clearly not accounted for in RxJava. Can you give me any alternative to solving the consumer driven RPC with minimal buffering that maintains ordering?

@akarnokd
Copy link
Member

Yes, swing around the request parameter into a PublishSubject that gets flatMapped and observed on at will:

PublishSubject<Request> req = PublishSubject.create();

req.flatMap(r -> service(r)).observeOn(Schedulers.computation()).subscribe(v -> {
   process(v);
   if (i++ % 20 == 0) {
       req.onNext(nextPage);
   }
});

req.onNext(initialRequest);

@stealthcode
Copy link
Author

Okay, thanks. This looks interesting but could you please explain your code example a little bit? What is nextPage? The recursive subscription of req is confusing me. Why do you call req.onNext from within the subscribe of req?

@akarnokd
Copy link
Member

akarnokd commented Apr 1, 2016

To start the next round of values with different parameters for service().

@stealthcode
Copy link
Author

I'm sorry but I don't understand it. Can you show some usage? Is this supposed to be implemented in a Subscriber?

I'm still not seeing any necessary change to the API. There is still a need to have direct control over providing data for the precise amount requested by the consumer. This seems like implementation to buffer/collect requests to be fulfilled at sometime in the future. If so this could be used as a stop gap solution with AsyncOnSubscribe until the request behaviors are corrected.

@akarnokd
Copy link
Member

akarnokd commented Apr 1, 2016

You try to reinterpret the request() call which only works if the producer and the consumer are next to each other without any intermediate operators. It's the same problem when people try to interpret the request amount as the number of bytes to return when the sequence type is byte[]. It won't work properly.

By using the PublishSubject above, you have the option to control the parameters to the service
and then consume the resulting data at your will. Then, signal the next set of parameters through the subject that calls the service which then creates another Observable sequence. This setup doesn't repurpose the backpressure-request amount and can be freely tranformed further.

public class ServicePaging {
    static Observable<Integer> service(int start, int size) {
        return Observable.range(start, size);
    }

    static Pair<Observer<Integer>, Observable<Integer>> repeatingService() {

        Subject<Integer, Integer> ps = PublishSubject.<Integer>create().toSerialized();

        return Pair.of(ps, ps.concatMap(v -> service(v, 20)));
    }

    public static void main(String[] args) {

        Pair<Observer<Integer>, Observable<Integer>> pair = repeatingService();

        pair.second.subscribe(new Subscriber<Integer>() {
            int totalReceived;
            int received;

            int index;

            @Override
            public void onNext(Integer t) {
                System.out.println(index + " - " + t);

                if (++received == 20) {
                    index += 20;
                    received = 0;
                    pair.first.onNext(index);
                }

                if (++totalReceived == 120) {
                    pair.first.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Done");
            }
        });

        pair.first.onNext(0);
    }
}

@stealthcode
Copy link
Author

Your code example shows a way that you can protect a producer from the request-1--produce-1 problem of observeOn by buffering onNext and mapping a batch of onNexts and mapping a batch to a call to a service. I don't see how this actually relates. The purpose of the AsyncOnSubscribe is to provide a way that users can create observables that support back pressure over asynchronous boundaries with as little buffering as possible. Your code example does not monitor the request amount so the number of elements produced by service cannot be constrained to the downstream operator buffer sizes. Also this example offers no capability of pre-fetching (i.e. buffers are draining and we request more data from the service eagerly). And one more thing that I think is slightly trivial to fix but still a jagged edge to consider... when there is a slow producer of onNext values then you may have an interim (i.e. between 0 and 20) request value for a long time without fulfilling that request.

I still don't see what is the hold up. Why not use request values for what they are intended?

@stealthcode
Copy link
Author

I just want to clarify. I don't think its absolutely necessary to fix all cases of this request behavior right now. I'm not talking about promoting into public (i.e. non-@Beta) immediately. I think that we should make it safe for use before its made public. What I'm really looking for from you @akarnokd is just some sort of agreement that this is a desirable direction. If so then we can change the conversation from it's API and move on to more productive "fix the request pattern" work. If you disagree with this API's usage or existence then we can argue it's existence and maybe there's no point in moving forward at all. So could please tell me where you stand regarding using back-pressure for negotiating data over an async boundary using the AsyncOnSubscribe?

@akarnokd
Copy link
Member

akarnokd commented Apr 4, 2016

Backpressure amounts are established between subsequent producer and consumer pairs in the operator chain, depending on the efficiency of moving data across a potential (i.e., flatMap) or actual (i.e., observeOn) async boundary. It is not there to propagate a specific request amount from the end consumer up to the main source; that requires a different protocol (i.e., bi-di streams).

But let's assume for a second it were so. The consequence would be that unless the end-consumer's request amount is constant, operators such as observeOn and flatMap can't work with a fixed capacity queue anymore. (Even if they go unbounded, a request with Long.MAX_VALUE puts us back to pre-backpressure era.) With that, having a flatMap mapping multiple values into AsyncOnSubscribe sources, what request amount should they receive? The original request going to each? Uniformly distributed, even if the number of sources is unknown? One by one?

The purpose of the AsyncOnSubscribe is to provide a way that users can create observables that support back pressure over asynchronous boundaries with as little buffering as possible.

In the current setup, this only works if the source and consumer are right next to each other since they reinterpret the request amount, making it a synchronous operator.

Your code example does not monitor the request amount so the number of elements produced by service cannot be constrained to the downstream operator buffer sizes.

Generally, you should be able to tell the service how many elements you'd like to receive before getting an Observable back; it is a constant 20 in my example.

Also this example offers no capability of pre-fetching (i.e. buffers are draining and we request more data from the service eagerly)

In my example, I didn't add observeOn to make things simple. Since nothing depends on the backpressure-request size in the example, an observeOn would prefetch as usual.

In addition, the prefetch you want is contradictory to your request N from service where comes from the end consumer. If the requesting is left as is now in RxJava, you get prefetch and on-demand delivery.

Bottom line is that sources such as AsyncOnSubscribe trying to repurpose the backpressure requresting can't work with async intermediate operators that change request amounts between upstream and downstream.

@stealthcode
Copy link
Author

It is not there to propagate a specific request amount from the end consumer up to the main source; that requires a different protocol (i.e., bi-di streams).

I don't think I have ever said that this is how back pressure works. Many times the request amounts are forwarded (i.e. trivial back pressure for operators like scan, etc). Your definition sounds precisely like what I am trying to say. However if your consumer chain has a buffer in place (i.e. a take, observeOn, flatMap, etc) then the consuming operator will now have a constraint on it's request amount. So in general the consumer observable chain will have some finite but dynamic request amount.

But let's assume for a second it were so. The consequence would be that unless the end-consumer's request amount is constant, operators such as observeOn and flatMap can't work with a fixed capacity queue anymore. (Even if they go unbounded, a request with Long.MAX_VALUE puts us back to pre-backpressure era.)

I am not sure what any of this has to do with our discussion since I am not talking about constant request amounts.

what request amount should they receive?

The AsyncOnSubscribe should fulfill data from their producer in exactly the amount requested. This should be however much is requested by the downstream consumer.

observeOn would prefetch as usual

In your example there was no ordering of multiple concurrent fulfilling requests. For example, the consuming subscriber requests a batch of 5000 (which has a buffer size of 5000). While the provider is busy producing the 5000 the buffer in the consumer drains 2000 entries and can then request 2000 more (total 7000). However the original 5000 have not finished draining from the producer. Concurrent batches and order preserving. There is nothing in your example that can handle this.

In the current setup, this only works if the source and consumer are right next to each other since they reinterpret the request amount, making it a synchronous operator.

What do you mean by a synchronous operator and why is this significant? Operators are synchronous in that there cannot be interleaving onNexts so I am not sure if you understand what the goal is.

The goal of AsyncOnSubscribe is for a consumer to request arbitrary amounts of data and for the producer to provide exactly that amount. It does not matter how big or small these amounts are nor should it matter when those requests are made. This is useful when integrating a system that cannot provide an Rx Observable interface. For instance, a message passing system like activeMQ or SQS which receives data and must buffer or risk violating back pressure rules. In a naive implementation one might wrap the message stream via a listener and observe the datasource directly. This will clearly not satisfy back pressure when there is a slow consumer. The next iteration one could use Observable.create(...) and try your best at implementing an OnSubscribe that provides a Publisher and reads from the data source while respecting back pressure. In this implementation you have to worry about interleaving requests.

The purpose of this is to integrate producers which have no concept of back pressure (i.e. sampling from mouse events or paginating over data services) and request ONLY the requisite amount at a time. This seems like exactly the purpose behind back pressure.

Bottom line is that sources such as AsyncOnSubscribe trying to repurpose the backpressure requresting can't work with async intermediate operators that change request amounts between upstream and downstream.

Can you tell me how the AsyncOnSubscribe changes the request amounts? They should be exactly the amounts requested from the consumer. Also this is not a "repurposing" of back pressure. The purpose of back pressure was to minimize the unbounded buffers between a producer and consumer. The AsyncOnSubscribe does exactly this while preserving ordering of data.

@akarnokd
Copy link
Member

akarnokd commented Apr 4, 2016

I simply can't understand what you were trying to accomplish. All I see is mixed concepts and interpretations of request, backpressure, RPC-like behavior, bi-directional value passing. If you want to Rx-ify a pull-like data source, SyncOnSubscribe does the job. If you want to Rx-ify a source that just pushes out values, you have buffer/drop/sample/block to make it stop. If you can tell a source outside RxJava to produce N items, make it like this:

Observable<GroupedObservable<Request, T>> service(Observable<Request> parameters);

where Request can hold the number of items you want a once from the paired GroupedObservable. Then you can concat/merge/concatEager the outer Observable to get a single stream of Ts. If the parameters is a PublishSubject or UnicastSubject, your consuming end Subscriber only has to call onNext on it (instead of its request(n)) method whenever it needs the next "batch".

@stealthcode
Copy link
Author

I have no other way to try to tell you that we need a way to convert request(n) to a streaming batch of potentially concurrent Observable<T>.

@benjchristensen this functionality was your vision. Perhaps you can communicate it best.

@abersnaze
Copy link
Contributor

@akarnokd I was just talking with @kurzweil about this and it got me thinking about you're last comment. Could we invert the service method you mentioned to something like

public static <T> Observable<T> create(final Transformer<Long, T> requestsToValues)

It would be a lot like retryWhen where the author of the transformer would have to compose together operators like scan to maintain a state across requests and merge, concat or concatEager to combine the values from multiple requests back into one stream of values.

@akarnokd
Copy link
Member

akarnokd commented Apr 7, 2016

If you mean rx.Observable.Transformer? That requires a source Observable which is available in Observable.compose as this.

I can't think of a version based on the retryWhen/repeatWhen signature. It seems you want the end-subscriber to be in charge of the amount to be generated in one batch, thus it has to know about the front to signal the generator.

There is an alternative signature to my method:

Subject<Long, T> service(Func1<Long, Observable<T>> generator, JoinMode mode);

But this may also require a special boundary indicator T for the Subscriber to know when to call onNext on the subject.

@abersnaze
Copy link
Contributor

If you mean rx.Observable.Transformer? That requires a source Observable which is available in Observable.compose as this.

Eclipse doesn't seem to mind that I don't have an instance of Observable. Maybe because it is an inner interface?

I can't think of a version based on the retryWhen/repeatWhen signature.

Not literally. The retryWhen operator takes a Func1<Observable<Throwable>, Observable<?>> which could be rewritten to Transformer<Throwable, ?>.

It seems you want the end-subscriber to be in charge of the amount to be generated in one batch, thus it has to know about the front to signal the generator.

Not really no. The values emitted to the Observable<Long> onNext would correspond 1:1 to the with request(n) to the producer.

There is an alternative signature to my method:
Subject<Long, T> service(Func1<Long, Observable<T>> generator, JoinMode mode);

I guess but it seem strange to have a new type to describe something that can be done by applying an existing operator. It also limits the user to only things that we've thought of. Like in a bazar world where someone wants to use switchMap to cancel the previous batch and start a new one on each request.

The place where my idea gets awkward is how does the returned observable terminate. The requests Observable<Long> isn't going to terminate so the transform author is going to have to find some way to have an inner observable complete the outer observable.

    public static <T> Observable<T> create(final Transformer<Long, T> requestsToValues) {
        return create((subscriber) -> {
                PublishSubject<Long> requestsSubject = PublishSubject.create();
                t.setProducer(requestsSubject::onNext);
                requestsToValues.call(requestsSubject).subscribe(subscriber);
            }
        });
    }

Here is a proof of concept of what it may look like to use it.

public class Main {
    public static void main(String[] args) {
        Observable.createAsync(requests ->
            getCount().flatMap(max ->
                requests
                    .scan(State::new, State::next)
                    .takeWhile(state -> state.start < max)
                    .concatMap(Main::getPage)));
    }

    private static class State {
        long start;
        long amount;

        public State next(long n) {
            State next = new State();
            next.start = this.start + amount;
            next.amount = n;
            return next;
        }
    }

    private static Observable<? extends Long> getCount() {
        return null;
    }

    private static Observable<? extends String> getPage(State request) {
        return null;
    }
}

anyway if you think this API has any merit we should probably move this off to another issue or PR.

@stealthcode
Copy link
Author

@benjchristensen Can you comment?

@benjchristensen
Copy link
Member

I find this API valuable, and it allows batch size and request(n) size to differ, which is exactly what is needed and wanted. Here is example code showing this behavior:

package scratch;

import rx.Observable;
import rx.Observer;
import rx.observables.AsyncOnSubscribe;
import rx.schedulers.Schedulers;

public class TestAsyncOnSubscribe {

  public static void main(String[] args) {

    Observable<String> obs = Observable.create(new AsyncOnSubscribe<Integer, String>() {

      final int batchSize = 50;

      @Override
      protected Integer generateState() {
        return 1;
      }

      @Override
      protected Integer
          next(Integer state, long requested, Observer<Observable<? extends String>> observer) {
        System.out.println("new async batch starting at: " + state + " requested: " + requested);
        observer.onNext(Observable
            .range(state, batchSize)
            .doOnSubscribe(() -> System.out.println("Subscribing to batch starting at: " + state))
            .map(i -> "value_" + i)
            .subscribeOn(Schedulers.computation()));
        return state + batchSize;
      }

    });

    //obs.take(15).toBlocking().forEach(v -> System.out.println(v));

    obs.observeOn(Schedulers.computation()).take(250).toBlocking().forEach(
        v -> System.out.println(v));

  }
}

Here is the output:

new async batch starting at: 1 requested: 128
Subscribing to batch starting at: 1
value_1
value_2
value_3
value_4
value_5
value_6
value_7
value_8
value_9
value_10
value_11
value_12
value_13
value_14
value_15
value_16
value_17
value_18
value_19
value_20
value_21
value_22
value_23
value_24
value_25
value_26
value_27
new async batch starting at: 51 requested: 78
value_28
value_29
value_30
value_31
value_32
value_33
value_34
value_35
value_36
value_37
value_38
value_39
value_40
value_41
value_42
value_43
value_44
value_45
value_46
value_47
value_48
value_49
value_50
Subscribing to batch starting at: 51
value_51
value_52
value_53
value_54
value_55
value_56
value_57
value_58
value_59
value_60
value_61
value_62
value_63
value_64
value_65
value_66
value_67
new async batch starting at: 101 requested: 28
value_68
value_69
value_70
value_71
value_72
value_73
value_74
value_75
value_76
value_77
value_78
value_79
value_80
value_81
value_82
value_83
Subscribing to batch starting at: 101
value_84
value_85
value_86
value_87
value_88
value_89
value_90
value_91
value_92
value_93
value_94
value_95
value_96
new async batch starting at: 151 requested: 96
value_97
value_98
value_99
value_100
value_101
value_102
value_103
value_104
value_105
value_106
value_107
value_108
value_109
value_110
value_111
value_112
value_113
value_114
value_115
value_116
value_117
value_118
value_119
value_120
value_121
value_122
value_123
value_124
value_125
value_126
value_127
value_128
value_129
value_130
value_131
value_132
value_133
value_134
value_135
value_136
value_137
value_138
value_139
value_140
value_141
value_142
value_143
value_144
value_145
value_146
value_147
value_148
value_149
value_150
Subscribing to batch starting at: 151
new async batch starting at: 201 requested: 46
value_151
value_152
value_153
value_154
value_155
value_156
value_157
value_158
value_159
value_160
value_161
value_162
value_163
value_164
value_165
value_166
value_167
value_168
value_169
value_170
value_171
value_172
value_173
value_174
value_175
value_176
value_177
value_178
value_179
value_180
value_181
value_182
value_183
value_184
value_185
value_186
value_187
value_188
value_189
value_190
value_191
value_192
new async batch starting at: 251 requested: 96
value_193
value_194
value_195
value_196
value_197
value_198
value_199
value_200
Subscribing to batch starting at: 201
Subscribing to batch starting at: 251
new async batch starting at: 301 requested: 46
value_201
value_202
value_203
value_204
value_205
value_206
value_207
value_208
value_209
value_210
value_211
value_212
value_213
value_214
value_215
value_216
value_217
value_218
value_219
value_220
value_221
value_222
value_223
value_224
value_225
value_226
value_227
Subscribing to batch starting at: 301
value_228
value_229
value_230
value_231
value_232
value_233
value_234
value_235
value_236
value_237
value_238
value_239
value_240
value_241
value_242
value_243
value_244
value_245
value_246
value_247
value_248
value_249
value_250

I'm using 250 even though it makes the output longer so that it is beyond the 128 that observeOn defaults to.

Note how this correctly does the following:

  • batch size is at 50 so each async call is a fixed size as per the developer requirements
  • request(n) size is independent of batch size (128, and different over time as observeOn invokes it)
  • it correctly shows when next is invoked to create a new batch, but it doesn't actually subscribe to it until the correct time when the previous batch is completed, thereby decoupling the request(n) and batch production
  • it guarantees ordering across async batches

One possible change would be to not invoke next eagerly whenever request(n) occurs, though in some use cases that may be wanted. I can argue it both ways.

And if LONG.MAX_VALUE is requested? It correctly behaves by executing batches of 50 repeatedly after each other:

new async batch starting at: 60051 requested: 9223372036854715757
Subscribing to batch starting at: 60051

This API is useful and very powerful when composing request(n) against a batched data source, and I support promoting it to @Beta.

@Rabajaba
Copy link

What if page responded has different size than requested? Like request(50) is called, and you decided to load data in a chunks of 20 items, but your RPC call responds with 10.
Real life use case is that you're reading a dataset with offset/limit set on the internal SQL query, while grouping it outside. It means that every page has <=N values, and you can define the dataset is over only when page is zero.

This is one of the reasons I created https://github.com/Rabajaba/rx-utils PortionObservable. (may be it will be helpful here)

@benjchristensen
Copy link
Member

Why does it return 10 when 20 is requested? If it's because it's the end, that's fine, as it should then cause termination. If the server can't actually respond deterministically, then it seems that using offset/limit is the wrong approach.

Your PortionObservable solution looks interesting. I browsed the code briefly, but not enough to grok the tradeoffs. Would be interesting for you and @stealthcode to compare solutions and see if AsyncOnSubscribe should change in any way before promoting it.

@Rabajaba
Copy link

@benjchristensen Thanks for the feedback.
It return 10 and not 20 with a real life use case. Imagine a large RDMS table, where you should read data grouped on some field. If you do select... group by <some set of colums> limit 100 offset 100 you may not hit indexes, because "group by" clause may be complex (like having several fields from different tables). That's why it's much more faster to do limit/offset initial datable, and do grouping after - causing lesser amounts of data to be responded.
This use case is pretty rare, but it took me few months, to properly code it in PortionObservable, that's why I'm asking whether this use case will be fulfilled anyhow.

@stealthcode One feature I didn't yet implemented: parallel load of pages, while results would be still emitted in a correct order. Use case: you have external slow API, which responds data not faster than 10 seconds, no matter to your request parameters. Doing parallel pages loading will significantly improve performance.

Let me create another implementation of PortionObservable using AsyncOnSubscribe and see whether my unit tests would still complete.

@akarnokd akarnokd added this to the 1.2 milestone Jun 19, 2016
@akarnokd
Copy link
Member

Ben's use case, which disregards the request amount in the generator callback, can be written with SyncOnSubscribe and flatMap limited to maxConcurrency == 1:

Observable.create(
    SyncOnSubscribe.<...>createSingleState(
        () -> 1, 
        (s, o) -> o.onNext(Observable.range(1, 50))
    )
)
.flatMap(o -> o, 1)
.subscribe(...)

Flatmap will request 1 and request another only after the previously generated inner Observable completes.

If you want to pre-generate these sources, use concatMap. If you want to prefetch the inner sources as well but keep their total order, use concatMapEager. Also apply observeOn, doOnNext, etc. as you see fit.

@akarnokd akarnokd changed the title Upgrading AsyncOnSubscribe from Experimental to Beta 1.x: Upgrading AsyncOnSubscribe from Experimental to Beta Jun 19, 2016
@akarnokd
Copy link
Member

akarnokd commented Jun 22, 2016

If I remember correctly, API promotion happened after voting before the minor version has changed (0.x -> 1.0, 1.0 -> 1.1).

If you wish, we can vote now @ReactiveX/rxjava-committers or just before 1.2. If voted for now, after a rebase, this can be merged and won't cause you problems with the ongoing cleanup work. If this can wait till 1.2, you can close the PR.

@akarnokd
Copy link
Member

Closing as not mergeable. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@akarnokd akarnokd closed this Jun 25, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants