Skip to content

Commit 034c68f

Browse files
committed
More tests for Publishers
1 parent 3fddb8b commit 034c68f

File tree

2 files changed

+163
-29
lines changed

2 files changed

+163
-29
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
790790
V value = entry.getValue();
791791

792792
Object callContext = callContextByKey.get(key);
793-
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
793+
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
794794

795795
onNextValue(key, value, callContext, futures);
796796

src/test/java/org/dataloader/DataLoaderTest.java

+162-28
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.dataloader;
1818

19+
import org.awaitility.Duration;
1920
import org.dataloader.fixtures.CustomCacheMap;
2021
import org.dataloader.fixtures.JsonObject;
2122
import org.dataloader.fixtures.TestKit;
2223
import org.dataloader.fixtures.User;
2324
import org.dataloader.fixtures.UserManager;
2425
import org.dataloader.impl.CompletableFutureKit;
26+
import org.dataloader.impl.DataLoaderAssertionException;
2527
import org.junit.jupiter.api.Named;
2628
import org.junit.jupiter.api.Test;
2729
import org.junit.jupiter.params.ParameterizedTest;
@@ -35,6 +37,7 @@
3537
import java.util.List;
3638
import java.util.Map;
3739
import java.util.Optional;
40+
import java.util.Set;
3841
import java.util.concurrent.CompletableFuture;
3942
import java.util.concurrent.CompletionStage;
4043
import java.util.concurrent.ExecutionException;
@@ -47,6 +50,7 @@
4750
import static java.util.Arrays.asList;
4851
import static java.util.Collections.emptyList;
4952
import static java.util.Collections.singletonList;
53+
import static java.util.concurrent.CompletableFuture.*;
5054
import static org.awaitility.Awaitility.await;
5155
import static org.dataloader.DataLoaderFactory.newDataLoader;
5256
import static org.dataloader.DataLoaderFactory.newMappedDataLoader;
@@ -104,7 +108,7 @@ public void basic_map_batch_loading() {
104108
mapOfResults.put(k, k);
105109
}
106110
});
107-
return CompletableFuture.completedFuture(mapOfResults);
111+
return completedFuture(mapOfResults);
108112
};
109113
DataLoader<String, String> loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader);
110114

@@ -424,7 +428,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f
424428
List<Collection<String>> loadCalls = new ArrayList<>();
425429
DataLoader<String, String> identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls);
426430

427-
DataLoader<String, String> dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A"));
431+
DataLoader<String, String> dlFluency = identityLoader.prime("A", completedFuture("A"));
428432
assertThat(dlFluency, equalTo(identityLoader));
429433

430434
CompletableFuture<String> future1 = identityLoader.load("A");
@@ -992,7 +996,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory
992996

993997
identityLoader.dispatch();
994998

995-
CompletableFuture.allOf(f1, f2, f3).join();
999+
allOf(f1, f2, f3).join();
9961000

9971001
assertThat(f1.join(), equalTo(1));
9981002
assertThat(f2.join(), equalTo(2));
@@ -1035,13 +1039,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
10351039

10361040
AtomicBoolean v4Called = new AtomicBoolean();
10371041

1038-
CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> {
1042+
supplyAsync(nullValue).thenAccept(v1 -> {
10391043
identityLoader.load("a");
1040-
CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> {
1044+
supplyAsync(nullValue).thenAccept(v2 -> {
10411045
identityLoader.load("b");
1042-
CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> {
1046+
supplyAsync(nullValue).thenAccept(v3 -> {
10431047
identityLoader.load("c");
1044-
CompletableFuture.supplyAsync(nullValue).thenAccept(
1048+
supplyAsync(nullValue).thenAccept(
10451049
v4 -> {
10461050
identityLoader.load("d");
10471051
v4Called.set(true);
@@ -1058,12 +1062,68 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
10581062
singletonList(asList("a", "b", "c", "d"))));
10591063
}
10601064

1065+
@ParameterizedTest
1066+
@MethodSource("dataLoaderFactories")
1067+
public void should_blowup_after_N_keys(TestDataLoaderFactory factory) {
1068+
if (!(factory instanceof TestReactiveDataLoaderFactory)) {
1069+
return;
1070+
}
1071+
//
1072+
// if we blow up after emitting N keys, the N keys should work but the rest of the keys
1073+
// should be exceptional
1074+
DataLoader<Integer, Integer> identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>());
1075+
CompletableFuture<Integer> cf1 = identityLoader.load(1);
1076+
CompletableFuture<Integer> cf2 = identityLoader.load(2);
1077+
CompletableFuture<Integer> cf3 = identityLoader.load(3);
1078+
CompletableFuture<Integer> cf4 = identityLoader.load(4);
1079+
CompletableFuture<Integer> cf5 = identityLoader.load(5);
1080+
identityLoader.dispatch();
1081+
await().until(cf5::isDone);
1082+
1083+
assertThat(cf1.join(), equalTo(1));
1084+
assertThat(cf2.join(), equalTo(2));
1085+
assertThat(cf3.join(), equalTo(3));
1086+
assertThat(cf4.isCompletedExceptionally(), is(true));
1087+
assertThat(cf5.isCompletedExceptionally(), is(true));
1088+
1089+
}
1090+
1091+
@ParameterizedTest
1092+
@MethodSource("dataLoaderFactories")
1093+
public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) {
1094+
//
1095+
// what happens if we want 4 values but are only given 2 back say
1096+
//
1097+
DataLoader<String, String> identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>());
1098+
CompletableFuture<String> cf1 = identityLoader.load("A");
1099+
CompletableFuture<String> cf2 = identityLoader.load("B");
1100+
CompletableFuture<String> cf3 = identityLoader.load("C");
1101+
CompletableFuture<String> cf4 = identityLoader.load("D");
1102+
identityLoader.dispatch();
1103+
1104+
await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone());
1105+
1106+
if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) {
1107+
assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
1108+
assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
1109+
assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
1110+
assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
1111+
} else {
1112+
// with the maps it's ok to have fewer results
1113+
assertThat(cf1.join(), equalTo("A"));
1114+
assertThat(cf2.join(), equalTo("B"));
1115+
assertThat(cf3.join(), equalTo(null));
1116+
assertThat(cf4.join(), equalTo(null));
1117+
}
1118+
1119+
}
1120+
10611121
@Test
10621122
public void can_call_a_loader_from_a_loader() throws Exception {
10631123
List<Collection<String>> deepLoadCalls = new ArrayList<>();
10641124
DataLoader<String, String> deepLoader = newDataLoader(keys -> {
10651125
deepLoadCalls.add(keys);
1066-
return CompletableFuture.completedFuture(keys);
1126+
return completedFuture(keys);
10671127
});
10681128

10691129
List<Collection<String>> aLoadCalls = new ArrayList<>();
@@ -1083,7 +1143,7 @@ public void can_call_a_loader_from_a_loader() throws Exception {
10831143
CompletableFuture<String> b1 = bLoader.load("B1");
10841144
CompletableFuture<String> b2 = bLoader.load("B2");
10851145

1086-
CompletableFuture.allOf(
1146+
allOf(
10871147
aLoader.dispatch(),
10881148
deepLoader.dispatch(),
10891149
bLoader.dispatch(),
@@ -1109,11 +1169,10 @@ public void can_call_a_loader_from_a_loader() throws Exception {
11091169
public void should_allow_composition_of_data_loader_calls() {
11101170
UserManager userManager = new UserManager();
11111171

1112-
BatchLoader<Long, User> userBatchLoader = userIds -> CompletableFuture
1113-
.supplyAsync(() -> userIds
1114-
.stream()
1115-
.map(userManager::loadUserById)
1116-
.collect(Collectors.toList()));
1172+
BatchLoader<Long, User> userBatchLoader = userIds -> supplyAsync(() -> userIds
1173+
.stream()
1174+
.map(userManager::loadUserById)
1175+
.collect(Collectors.toList()));
11171176
DataLoader<Long, User> userLoader = newDataLoader(userBatchLoader);
11181177

11191178
AtomicBoolean gandalfCalled = new AtomicBoolean(false);
@@ -1160,17 +1219,26 @@ private static Stream<Arguments> dataLoaderFactories() {
11601219

11611220
public interface TestDataLoaderFactory {
11621221
<K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);
1222+
11631223
<K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);
1224+
11641225
<K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);
1226+
11651227
DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);
1228+
1229+
DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
1230+
}
1231+
1232+
public interface TestReactiveDataLoaderFactory {
1233+
<K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
11661234
}
11671235

11681236
private static class ListDataLoaderFactory implements TestDataLoaderFactory {
11691237
@Override
11701238
public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
11711239
return newDataLoader(keys -> {
11721240
loadCalls.add(new ArrayList<>(keys));
1173-
return CompletableFuture.completedFuture(keys);
1241+
return completedFuture(keys);
11741242
}, options);
11751243
}
11761244

@@ -1189,7 +1257,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options
11891257
loadCalls.add(new ArrayList<>(keys));
11901258

11911259
List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
1192-
return CompletableFuture.completedFuture(errors);
1260+
return completedFuture(errors);
11931261
}, options);
11941262
}
11951263

@@ -1206,7 +1274,15 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions o
12061274
errors.add(new IllegalStateException("Error"));
12071275
}
12081276
}
1209-
return CompletableFuture.completedFuture(errors);
1277+
return completedFuture(errors);
1278+
}, options);
1279+
}
1280+
1281+
@Override
1282+
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
1283+
return newDataLoader(keys -> {
1284+
loadCalls.add(new ArrayList<>(keys));
1285+
return completedFuture(keys.subList(0, N));
12101286
}, options);
12111287
}
12121288
}
@@ -1220,7 +1296,7 @@ public <K> DataLoader<K, K> idLoader(
12201296
loadCalls.add(new ArrayList<>(keys));
12211297
Map<K, K> map = new HashMap<>();
12221298
keys.forEach(k -> map.put(k, k));
1223-
return CompletableFuture.completedFuture(map);
1299+
return completedFuture(map);
12241300
}, options);
12251301
}
12261302

@@ -1239,7 +1315,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
12391315
loadCalls.add(new ArrayList<>(keys));
12401316
Map<K, Object> errorByKey = new HashMap<>();
12411317
keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
1242-
return CompletableFuture.completedFuture(errorByKey);
1318+
return completedFuture(errorByKey);
12431319
}, options);
12441320
}
12451321

@@ -1257,16 +1333,28 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
12571333
errorByKey.put(key, new IllegalStateException("Error"));
12581334
}
12591335
}
1260-
return CompletableFuture.completedFuture(errorByKey);
1336+
return completedFuture(errorByKey);
1337+
}, options);
1338+
}
1339+
1340+
@Override
1341+
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
1342+
return newMappedDataLoader(keys -> {
1343+
loadCalls.add(new ArrayList<>(keys));
1344+
1345+
Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
1346+
k -> k, v -> v
1347+
));
1348+
return completedFuture(collect);
12611349
}, options);
12621350
}
12631351
}
12641352

1265-
private static class PublisherDataLoaderFactory implements TestDataLoaderFactory {
1353+
private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
12661354

12671355
@Override
12681356
public <K> DataLoader<K, K> idLoader(
1269-
DataLoaderOptions options, List<Collection<K>> loadCalls) {
1357+
DataLoaderOptions options, List<Collection<K>> loadCalls) {
12701358
return newPublisherDataLoader((keys, subscriber) -> {
12711359
loadCalls.add(new ArrayList<>(keys));
12721360
Flux.fromIterable(keys).subscribe(subscriber);
@@ -1283,7 +1371,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col
12831371

12841372
@Override
12851373
public <K> DataLoader<K, Object> idLoaderAllExceptions(
1286-
DataLoaderOptions options, List<Collection<K>> loadCalls) {
1374+
DataLoaderOptions options, List<Collection<K>> loadCalls) {
12871375
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
12881376
loadCalls.add(new ArrayList<>(keys));
12891377
Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
@@ -1293,7 +1381,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
12931381

12941382
@Override
12951383
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
1296-
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
1384+
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
12971385
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
12981386
loadCalls.add(new ArrayList<>(keys));
12991387

@@ -1308,13 +1396,36 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
13081396
Flux.fromIterable(errors).subscribe(subscriber);
13091397
}, options);
13101398
}
1399+
1400+
@Override
1401+
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
1402+
return newPublisherDataLoader((keys, subscriber) -> {
1403+
loadCalls.add(new ArrayList<>(keys));
1404+
1405+
List<K> nKeys = keys.subList(0, N);
1406+
Flux<K> subFlux = Flux.fromIterable(nKeys);
1407+
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
1408+
.subscribe(subscriber);
1409+
}, options);
1410+
}
1411+
1412+
@Override
1413+
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
1414+
return newPublisherDataLoader((keys, subscriber) -> {
1415+
loadCalls.add(new ArrayList<>(keys));
1416+
1417+
List<String> nKeys = keys.subList(0, N);
1418+
Flux.fromIterable(nKeys)
1419+
.subscribe(subscriber);
1420+
}, options);
1421+
}
13111422
}
13121423

1313-
private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory {
1424+
private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
13141425

13151426
@Override
13161427
public <K> DataLoader<K, K> idLoader(
1317-
DataLoaderOptions options, List<Collection<K>> loadCalls) {
1428+
DataLoaderOptions options, List<Collection<K>> loadCalls) {
13181429
return newMappedPublisherDataLoader((keys, subscriber) -> {
13191430
loadCalls.add(new ArrayList<>(keys));
13201431
Map<K, K> map = new HashMap<>();
@@ -1333,7 +1444,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col
13331444

13341445
@Override
13351446
public <K> DataLoader<K, Object> idLoaderAllExceptions(
1336-
DataLoaderOptions options, List<Collection<K>> loadCalls) {
1447+
DataLoaderOptions options, List<Collection<K>> loadCalls) {
13371448
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
13381449
loadCalls.add(new ArrayList<>(keys));
13391450
Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
@@ -1343,7 +1454,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
13431454

13441455
@Override
13451456
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
1346-
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
1457+
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
13471458
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
13481459
loadCalls.add(new ArrayList<>(keys));
13491460

@@ -1358,6 +1469,29 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
13581469
Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
13591470
}, options);
13601471
}
1472+
1473+
@Override
1474+
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
1475+
return newMappedPublisherDataLoader((keys, subscriber) -> {
1476+
loadCalls.add(new ArrayList<>(keys));
1477+
1478+
List<K> nKeys = keys.subList(0, N);
1479+
Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
1480+
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
1481+
.subscribe(subscriber);
1482+
}, options);
1483+
}
1484+
1485+
@Override
1486+
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
1487+
return newMappedPublisherDataLoader((keys, subscriber) -> {
1488+
loadCalls.add(new ArrayList<>(keys));
1489+
1490+
List<String> nKeys = keys.subList(0, N);
1491+
Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
1492+
.subscribe(subscriber);
1493+
}, options);
1494+
}
13611495
}
13621496

13631497
private static class ThrowingCacheMap extends CustomCacheMap {

0 commit comments

Comments
 (0)