From 034c68f4ca4faa17cf762230891324b04b3df7be Mon Sep 17 00:00:00 2001 From: bbaker Date: Wed, 22 May 2024 20:25:48 +1000 Subject: [PATCH 1/2] More tests for Publishers --- .../java/org/dataloader/DataLoaderHelper.java | 2 +- .../java/org/dataloader/DataLoaderTest.java | 190 +++++++++++++++--- 2 files changed, 163 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index d01b930..cf3fc6e 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -790,7 +790,7 @@ public synchronized void onNext(Map.Entry entry) { V value = entry.getValue(); Object callContext = callContextByKey.get(key); - List> futures = queuedFuturesByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); onNextValue(key, value, callContext, futures); diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index a87c77d..c8d4562 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -16,12 +16,14 @@ package org.dataloader; +import org.awaitility.Duration; import org.dataloader.fixtures.CustomCacheMap; import org.dataloader.fixtures.JsonObject; import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.impl.DataLoaderAssertionException; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -35,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -47,6 +50,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.*; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newMappedDataLoader; @@ -104,7 +108,7 @@ public void basic_map_batch_loading() { mapOfResults.put(k, k); } }); - return CompletableFuture.completedFuture(mapOfResults); + return completedFuture(mapOfResults); }; DataLoader loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader); @@ -424,7 +428,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f List> loadCalls = new ArrayList<>(); DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); - DataLoader dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A")); + DataLoader dlFluency = identityLoader.prime("A", completedFuture("A")); assertThat(dlFluency, equalTo(identityLoader)); CompletableFuture future1 = identityLoader.load("A"); @@ -992,7 +996,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory identityLoader.dispatch(); - CompletableFuture.allOf(f1, f2, f3).join(); + allOf(f1, f2, f3).join(); assertThat(f1.join(), equalTo(1)); assertThat(f2.join(), equalTo(2)); @@ -1035,13 +1039,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa AtomicBoolean v4Called = new AtomicBoolean(); - CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> { + supplyAsync(nullValue).thenAccept(v1 -> { identityLoader.load("a"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> { + supplyAsync(nullValue).thenAccept(v2 -> { identityLoader.load("b"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> { + supplyAsync(nullValue).thenAccept(v3 -> { identityLoader.load("c"); - CompletableFuture.supplyAsync(nullValue).thenAccept( + supplyAsync(nullValue).thenAccept( v4 -> { identityLoader.load("d"); v4Called.set(true); @@ -1058,12 +1062,68 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa singletonList(asList("a", "b", "c", "d")))); } + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_blowup_after_N_keys(TestDataLoaderFactory factory) { + if (!(factory instanceof TestReactiveDataLoaderFactory)) { + return; + } + // + // if we blow up after emitting N keys, the N keys should work but the rest of the keys + // should be exceptional + DataLoader identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load(1); + CompletableFuture cf2 = identityLoader.load(2); + CompletableFuture cf3 = identityLoader.load(3); + CompletableFuture cf4 = identityLoader.load(4); + CompletableFuture cf5 = identityLoader.load(5); + identityLoader.dispatch(); + await().until(cf5::isDone); + + assertThat(cf1.join(), equalTo(1)); + assertThat(cf2.join(), equalTo(2)); + assertThat(cf3.join(), equalTo(3)); + assertThat(cf4.isCompletedExceptionally(), is(true)); + assertThat(cf5.isCompletedExceptionally(), is(true)); + + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but are only given 2 back say + // + DataLoader identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + + await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone()); + + if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + // with the maps it's ok to have fewer results + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo(null)); + assertThat(cf4.join(), equalTo(null)); + } + + } + @Test public void can_call_a_loader_from_a_loader() throws Exception { List> deepLoadCalls = new ArrayList<>(); DataLoader deepLoader = newDataLoader(keys -> { deepLoadCalls.add(keys); - return CompletableFuture.completedFuture(keys); + return completedFuture(keys); }); List> aLoadCalls = new ArrayList<>(); @@ -1083,7 +1143,7 @@ public void can_call_a_loader_from_a_loader() throws Exception { CompletableFuture b1 = bLoader.load("B1"); CompletableFuture b2 = bLoader.load("B2"); - CompletableFuture.allOf( + allOf( aLoader.dispatch(), deepLoader.dispatch(), bLoader.dispatch(), @@ -1109,11 +1169,10 @@ public void can_call_a_loader_from_a_loader() throws Exception { public void should_allow_composition_of_data_loader_calls() { UserManager userManager = new UserManager(); - BatchLoader userBatchLoader = userIds -> CompletableFuture - .supplyAsync(() -> userIds - .stream() - .map(userManager::loadUserById) - .collect(Collectors.toList())); + BatchLoader userBatchLoader = userIds -> supplyAsync(() -> userIds + .stream() + .map(userManager::loadUserById) + .collect(Collectors.toList())); DataLoader userLoader = newDataLoader(userBatchLoader); AtomicBoolean gandalfCalled = new AtomicBoolean(false); @@ -1160,9 +1219,18 @@ private static Stream dataLoaderFactories() { public interface TestDataLoaderFactory { DataLoader idLoader(DataLoaderOptions options, List> loadCalls); + DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); + DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); + DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls); + } + + public interface TestReactiveDataLoaderFactory { + DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls); } private static class ListDataLoaderFactory implements TestDataLoaderFactory { @@ -1170,7 +1238,7 @@ private static class ListDataLoaderFactory implements TestDataLoaderFactory { public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { return newDataLoader(keys -> { loadCalls.add(new ArrayList<>(keys)); - return CompletableFuture.completedFuture(keys); + return completedFuture(keys); }, options); } @@ -1189,7 +1257,7 @@ public DataLoader idLoaderAllExceptions(DataLoaderOptions options loadCalls.add(new ArrayList<>(keys)); List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); - return CompletableFuture.completedFuture(errors); + return completedFuture(errors); }, options); } @@ -1206,7 +1274,15 @@ public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions o errors.add(new IllegalStateException("Error")); } } - return CompletableFuture.completedFuture(errors); + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys.subList(0, N)); }, options); } } @@ -1220,7 +1296,7 @@ public DataLoader idLoader( loadCalls.add(new ArrayList<>(keys)); Map map = new HashMap<>(); keys.forEach(k -> map.put(k, k)); - return CompletableFuture.completedFuture(map); + return completedFuture(map); }, options); } @@ -1239,7 +1315,7 @@ public DataLoader idLoaderAllExceptions( loadCalls.add(new ArrayList<>(keys)); Map errorByKey = new HashMap<>(); keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); - return CompletableFuture.completedFuture(errorByKey); + return completedFuture(errorByKey); }, options); } @@ -1257,16 +1333,28 @@ public DataLoader idLoaderOddEvenExceptions( errorByKey.put(key, new IllegalStateException("Error")); } } - return CompletableFuture.completedFuture(errorByKey); + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); }, options); } } - private static class PublisherDataLoaderFactory implements TestDataLoaderFactory { + private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { @Override public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newPublisherDataLoader((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); Flux.fromIterable(keys).subscribe(subscriber); @@ -1283,7 +1371,7 @@ public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newPublisherDataLoaderWithTry((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); @@ -1293,7 +1381,7 @@ public DataLoader idLoaderAllExceptions( @Override public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newPublisherDataLoaderWithTry((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); @@ -1308,13 +1396,36 @@ public DataLoader idLoaderOddEvenExceptions( Flux.fromIterable(errors).subscribe(subscriber); }, options); } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux subFlux = Flux.fromIterable(nKeys); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys) + .subscribe(subscriber); + }, options); + } } - private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory { + private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { @Override public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newMappedPublisherDataLoader((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); Map map = new HashMap<>(); @@ -1333,7 +1444,7 @@ public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); @@ -1343,7 +1454,7 @@ public DataLoader idLoaderAllExceptions( @Override public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { + DataLoaderOptions options, List> loadCalls) { return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { loadCalls.add(new ArrayList<>(keys)); @@ -1358,6 +1469,29 @@ public DataLoader idLoaderOddEvenExceptions( Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); }, options); } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } } private static class ThrowingCacheMap extends CustomCacheMap { From 8b344dbdba430c2ca56896112c0b9f85a2d01cb4 Mon Sep 17 00:00:00 2001 From: bbaker Date: Thu, 23 May 2024 15:46:21 +1000 Subject: [PATCH 2/2] Now the builds pass - broken out the fixtures --- .../java/org/dataloader/DataLoaderHelper.java | 50 ++- .../java/org/dataloader/DataLoaderTest.java | 343 +++--------------- .../java/org/dataloader/fixtures/TestKit.java | 9 + .../parameterized/ListDataLoaderFactory.java | 79 ++++ .../MappedDataLoaderFactory.java | 95 +++++ .../MappedPublisherDataLoaderFactory.java | 104 ++++++ .../PublisherDataLoaderFactory.java | 100 +++++ .../parameterized/TestDataLoaderFactory.java | 22 ++ .../TestReactiveDataLoaderFactory.java | 11 + 9 files changed, 516 insertions(+), 297 deletions(-) create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index cf3fc6e..edbf348 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.impl.DataLoaderAssertionException; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -624,6 +625,15 @@ private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } + /********************************************************************************************** + * ******************************************************************************************** + *

+ * The reactive support classes start here + * + * @param for two + ********************************************************************************************** + ********************************************************************************************** + */ private abstract class DataLoaderSubscriberBase implements Subscriber { final CompletableFuture> valuesFuture; @@ -721,6 +731,11 @@ private DataLoaderSubscriber( public synchronized void onNext(V value) { super.onNext(value); + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } K key = keys.get(idx); Object callContext = callContexts.get(idx); CompletableFuture future = queuedFutures.get(idx); @@ -734,8 +749,16 @@ public synchronized void onNext(V value) { @Override public synchronized void onComplete() { super.onComplete(); - assertResultSize(keys, completedValues); - + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } + } + } possiblyClearCacheEntriesOnExceptions(clearCacheKeys); valuesFuture.complete(completedValues); } @@ -748,9 +771,11 @@ public synchronized void onError(Throwable ex) { for (int i = idx; i < queuedFutures.size(); i++) { K key = keys.get(i); CompletableFuture future = queuedFutures.get(i); - future.completeExceptionally(ex); - // clear any cached view of this key because they all failed - dataLoader.clear(key); + if (! future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + dataLoader.clear(key); + } } valuesFuture.completeExceptionally(ex); } @@ -794,7 +819,10 @@ public synchronized void onNext(Map.Entry entry) { onNextValue(key, value, callContext, futures); - completedValuesByKey.put(key, value); + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } } @Override @@ -806,6 +834,16 @@ public synchronized void onComplete() { for (K key : keys) { V value = completedValuesByKey.get(key); values.add(value); + + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (! future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } + } } valuesFuture.complete(values); } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index c8d4562..1f748fb 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -19,9 +19,14 @@ import org.awaitility.Duration; import org.dataloader.fixtures.CustomCacheMap; import org.dataloader.fixtures.JsonObject; -import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; +import org.dataloader.fixtures.parameterized.ListDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedPublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.PublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestReactiveDataLoaderFactory; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.impl.DataLoaderAssertionException; import org.junit.jupiter.api.Named; @@ -29,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import reactor.core.publisher.Flux; import java.util.ArrayList; import java.util.Collection; @@ -37,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -59,7 +62,7 @@ import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; import static org.dataloader.DataLoaderOptions.newOptions; -import static org.dataloader.fixtures.TestKit.futureError; +import static org.dataloader.fixtures.TestKit.areAllDone; import static org.dataloader.fixtures.TestKit.listFrom; import static org.dataloader.impl.CompletableFutureKit.cause; import static org.hamcrest.MatcherAssert.assertThat; @@ -68,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests for {@link DataLoader}. @@ -1090,7 +1094,7 @@ public void should_blowup_after_N_keys(TestDataLoaderFactory factory) { @ParameterizedTest @MethodSource("dataLoaderFactories") - public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) { + public void when_values_size_are_less_then_key_size(TestDataLoaderFactory factory) { // // what happens if we want 4 values but are only given 2 back say // @@ -1101,13 +1105,19 @@ public void should_assert_values_size_equals_key_size(TestDataLoaderFactory fact CompletableFuture cf4 = identityLoader.load("D"); identityLoader.dispatch(); - await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone()); + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); - if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) { + if (factory instanceof ListDataLoaderFactory) { assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else if (factory instanceof PublisherDataLoaderFactory) { + // some have completed progressively but the other never did + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); } else { // with the maps it's ok to have fewer results assertThat(cf1.join(), equalTo("A")); @@ -1115,7 +1125,34 @@ public void should_assert_values_size_equals_key_size(TestDataLoaderFactory fact assertThat(cf3.join(), equalTo(null)); assertThat(cf4.join(), equalTo(null)); } + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void when_values_size_are_more_then_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but only given 6 back say + // + DataLoader identityLoader = factory.idLoaderReturnsTooMany(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); + + if (factory instanceof ListDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo("C")); + assertThat(cf4.join(), equalTo("D")); + } } @Test @@ -1208,6 +1245,14 @@ private static CacheKey getJsonObjectCacheMapFn() { .collect(Collectors.joining()); } + private static class ThrowingCacheMap extends CustomCacheMap { + + @Override + public CompletableFuture get(String key) { + throw new RuntimeException("Cache implementation failed."); + } + } + private static Stream dataLoaderFactories() { return Stream.of( Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())), @@ -1216,289 +1261,5 @@ private static Stream dataLoaderFactories() { Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory())) ); } - - public interface TestDataLoaderFactory { - DataLoader idLoader(DataLoaderOptions options, List> loadCalls); - - DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); - - DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); - - DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); - - DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls); - } - - public interface TestReactiveDataLoaderFactory { - DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls); - } - - private static class ListDataLoaderFactory implements TestDataLoaderFactory { - @Override - public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - return completedFuture(keys); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - return TestKit.futureError(); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); - return completedFuture(errors); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - List errors = new ArrayList<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errors.add(key); - } else { - errors.add(new IllegalStateException("Error")); - } - } - return completedFuture(errors); - }, options); - } - - @Override - public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - return completedFuture(keys.subList(0, N)); - }, options); - } - } - - private static class MappedDataLoaderFactory implements TestDataLoaderFactory { - - @Override - public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader((keys) -> { - loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - keys.forEach(k -> map.put(k, k)); - return completedFuture(map); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader((keys) -> { - loadCalls.add(new ArrayList<>(keys)); - return futureError(); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - Map errorByKey = new HashMap<>(); - keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); - return completedFuture(errorByKey); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - Map errorByKey = new HashMap<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errorByKey.put(key, key); - } else { - errorByKey.put(key, new IllegalStateException("Error")); - } - } - return completedFuture(errorByKey); - }, options); - } - - @Override - public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { - return newMappedDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - Map collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap( - k -> k, v -> v - )); - return completedFuture(collect); - }, options); - } - } - - private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { - - @Override - public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.fromIterable(keys).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.error(new IllegalStateException("Error")).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoaderWithTry((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); - Flux.fromStream(failures).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoaderWithTry((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List> errors = new ArrayList<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errors.add(Try.succeeded(key)); - } else { - errors.add(Try.failed(new IllegalStateException("Error"))); - } - } - Flux.fromIterable(errors).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List nKeys = keys.subList(0, N); - Flux subFlux = Flux.fromIterable(nKeys); - subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) - .subscribe(subscriber); - }, options); - } - - @Override - public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { - return newPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List nKeys = keys.subList(0, N); - Flux.fromIterable(nKeys) - .subscribe(subscriber); - }, options); - } - } - - private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { - - @Override - public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - keys.forEach(k -> map.put(k, k)); - Flux.fromIterable(map.entrySet()).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); - Flux.fromStream(failures).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - Map> errorByKey = new HashMap<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errorByKey.put(key, Try.succeeded(key)); - } else { - errorByKey.put(key, Try.failed(new IllegalStateException("Error"))); - } - } - Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List nKeys = keys.subList(0, N); - Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); - subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) - .subscribe(subscriber); - }, options); - } - - @Override - public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { - return newMappedPublisherDataLoader((keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List nKeys = keys.subList(0, N); - Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) - .subscribe(subscriber); - }, options); - } - } - - private static class ThrowingCacheMap extends CustomCacheMap { - @Override - public CompletableFuture get(String key) { - throw new RuntimeException("Cache implementation failed."); - } - } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index adffb06..c22988d 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -131,4 +131,13 @@ public static Set asSet(T... elements) { public static Set asSet(Collection elements) { return new LinkedHashSet<>(elements); } + + public static boolean areAllDone(CompletableFuture... cfs) { + for (CompletableFuture cf : cfs) { + if (! cf.isDone()) { + return false; + } + } + return true; + } } diff --git a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java new file mode 100644 index 0000000..ee1f1d7 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java @@ -0,0 +1,79 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.fixtures.TestKit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newDataLoader; + +public class ListDataLoaderFactory implements TestDataLoaderFactory { + @Override + public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps( + DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return TestKit.futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(key); + } else { + errors.add(new IllegalStateException("Error")); + } + } + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys.subList(0, N)); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + return completedFuture(l); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java new file mode 100644 index 0000000..8f41441 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java @@ -0,0 +1,95 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.futureError; + +public class MappedDataLoaderFactory implements TestDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + return completedFuture(map); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + return futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + Map errorByKey = new HashMap<>(); + keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, key); + } else { + errorByKey.put(key, new IllegalStateException("Error")); + } + } + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Map collect = l.stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java new file mode 100644 index 0000000..f5c1ad5 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java @@ -0,0 +1,104 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; + +public class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + Flux.fromIterable(map.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + Map> errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, Try.succeeded(key)); + } else { + errorByKey.put(key, Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java new file mode 100644 index 0000000..d75ff38 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java @@ -0,0 +1,100 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; + +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; + +public class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.fromIterable(keys).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List> errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(Try.succeeded(key)); + } else { + errors.add(Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errors).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux subFlux = Flux.fromIterable(nKeys); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java new file mode 100644 index 0000000..8c1bc22 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java @@ -0,0 +1,22 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public interface TestDataLoaderFactory { + DataLoader idLoader(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls); + + DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls); +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java new file mode 100644 index 0000000..d45932c --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java @@ -0,0 +1,11 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.Collection; +import java.util.List; + +public interface TestReactiveDataLoaderFactory { + DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls); +}