Skip to content

Commit 8df8b2f

Browse files
authored
Merge pull request #161 from graphql-java/tweak-reactive-docs
Tweaked readme
2 parents d44070a + f759b34 commit 8df8b2f

File tree

3 files changed

+27
-13
lines changed

3 files changed

+27
-13
lines changed

README.md

+17-6
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ For example, let's assume you want to load users from a database, you could prob
288288

289289
### Returning a stream of results from your batch publisher
290290

291-
It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.
291+
It may be that your batch loader function can use a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.
292292

293293
For example, let's say you wanted to load many users from a service without forcing the service to load all
294294
users into its memory (which may exert considerable pressure on it).
@@ -299,29 +299,37 @@ A `org.dataloader.BatchPublisher` may be used to load this data:
299299
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
300300
@Override
301301
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
302-
userManager.publishUsersById(userIds, userSubscriber);
302+
Publisher<User> userResults = userManager.streamUsersById(userIds);
303+
userResults.subscribe(userSubscriber);
303304
}
304305
};
305306
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
306307

307308
// ...
308309
```
309310

310-
Rather than waiting for all values to be returned, this `DataLoader` will complete
311+
Rather than waiting for all user values to be returned on one batch, this `DataLoader` will complete
311312
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
312-
processed.
313+
published.
314+
315+
This pattern means that data loader values can (in theory) be satisfied more quickly than if we wait for
316+
all results in the batch to be retrieved and hence the overall result may finish more quickly.
313317

314318
If an exception is thrown, the remaining futures yet to be completed are completed
315319
exceptionally.
316320

317321
You *MUST* ensure that the values are streamed in the same order as the keys provided,
318322
with the same cardinality (i.e. the number of values must match the number of keys).
323+
319324
Failing to do so will result in incorrect data being returned from `DataLoader#load`.
320325

326+
`BatchPublisher` is the reactive version of `BatchLoader`.
327+
321328

322329
### Returning a mapped stream of results from your batch publisher
323330

324-
Your publisher may not necessarily return values in the same order in which it processes keys.
331+
Your publisher may not necessarily return values in the same order in which it processes keys and it
332+
may not be able to find a value for each key presented.
325333

326334
For example, let's say your batch publisher function loads user data which is spread across shards,
327335
with some shards responding more quickly than others.
@@ -332,7 +340,8 @@ In instances like these, `org.dataloader.MappedBatchPublisher` can be used.
332340
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
333341
@Override
334342
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
335-
userManager.publishUsersById(userIds, userEntrySubscriber);
343+
Publisher<Map.Entry<Long, User>> userEntries = userManager.streamUsersById(userIds);
344+
userEntries.subscribe(userEntrySubscriber);
336345
}
337346
};
338347
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
@@ -346,6 +355,8 @@ exceptionally.
346355
Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
347356
or even the same number of values.
348357

358+
`MappedBatchPublisher` is the reactive version of `MappedBatchLoader`.
359+
349360
### Error object is not a thing in a type safe Java world
350361

351362
In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected

src/test/java/ReadmeExamples.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.dataloader.scheduler.BatchLoaderScheduler;
1818
import org.dataloader.stats.Statistics;
1919
import org.dataloader.stats.ThreadLocalStatisticsCollector;
20+
import org.reactivestreams.Publisher;
2021
import org.reactivestreams.Subscriber;
2122

2223
import java.time.Duration;
@@ -194,7 +195,8 @@ private void batchPublisher() {
194195
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
195196
@Override
196197
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
197-
userManager.publishUsersById(userIds, userSubscriber);
198+
Publisher<User> userResults = userManager.streamUsersById(userIds);
199+
userResults.subscribe(userSubscriber);
198200
}
199201
};
200202
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
@@ -204,7 +206,8 @@ private void mappedBatchPublisher() {
204206
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
205207
@Override
206208
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
207-
userManager.publishUsersById(userIds, userEntrySubscriber);
209+
Publisher<Map.Entry<Long, User>> userEntries = userManager.streamUsersById(userIds);
210+
userEntries.subscribe(userEntrySubscriber);
208211
}
209212
};
210213
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);

src/test/java/org/dataloader/fixtures/UserManager.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.dataloader.fixtures;
22

3-
import org.reactivestreams.Subscriber;
3+
import org.reactivestreams.Publisher;
44
import reactor.core.publisher.Flux;
55

66
import java.util.HashMap;
@@ -55,12 +55,12 @@ public List<User> loadUsersById(List<Long> userIds) {
5555
return userIds.stream().map(this::loadUserById).collect(Collectors.toList());
5656
}
5757

58-
public void publishUsersById(List<Long> userIds, Subscriber<? super User> userSubscriber) {
59-
Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber);
58+
public Publisher<User> streamUsersById(List<Long> userIds) {
59+
return Flux.fromIterable(loadUsersById(userIds));
6060
}
6161

62-
public void publishUsersById(Set<Long> userIds, Subscriber<? super Map.Entry<Long, User>> userEntrySubscriber) {
63-
Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber);
62+
public Publisher<Map.Entry<Long, User>> streamUsersById(Set<Long> userIds) {
63+
return Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet());
6464
}
6565

6666
public Map<Long, User> loadMapOfUsersByIds(SecurityCtx callCtx, Set<Long> userIds) {

0 commit comments

Comments
 (0)