Skip to content

Adds a predicate to DataLoaderRegistry and a per dataloader map of pedicates is also possible #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 8, 2023
19 changes: 12 additions & 7 deletions src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.function.Function;

/**
* This allows data loaders to be registered together into a single place so
* This allows data loaders to be registered together into a single place, so
* they can be dispatched as one. It also allows you to retrieve data loaders by
* name from a central place
*/
Expand All @@ -25,7 +25,7 @@ public class DataLoaderRegistry {
public DataLoaderRegistry() {
}

private DataLoaderRegistry(Builder builder) {
protected DataLoaderRegistry(Builder<?> builder) {
this.dataLoaders.putAll(builder.dataLoaders);
}

Expand Down Expand Up @@ -179,10 +179,15 @@ public static Builder newRegistry() {
return new Builder();
}

public static class Builder {
public static class Builder<B extends Builder<B>> {

private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();

protected B self() {
//noinspection unchecked
return (B) this;
}

/**
* This will register a new dataloader
*
Expand All @@ -191,9 +196,9 @@ public static class Builder {
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader) {
public B register(String key, DataLoader<?, ?> dataLoader) {
dataLoaders.put(key, dataLoader);
return this;
return self();
}

/**
Expand All @@ -204,9 +209,9 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
*
* @return this builder for a fluent pattern
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
public B registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.dataLoaders);
return this;
return self();
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/annotations/GuardedBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
public @interface GuardedBy {

/**
* The lock that should be held.
* @return The lock that should be held.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopped a javadoc warning

*/
String value();
}
10 changes: 10 additions & 0 deletions src/main/java/org/dataloader/registries/DispatchPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
*/
@FunctionalInterface
public interface DispatchPredicate {

/**
* A predicate that always returns true
*/
DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
/**
* A predicate that always returns false
*/
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;

/**
* This predicate tests whether the data loader should be dispatched or not.
*
Expand Down
160 changes: 132 additions & 28 deletions src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import org.dataloader.annotations.ExperimentalApi;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,17 +30,19 @@
@ExperimentalApi
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {

private final ScheduledExecutorService scheduledExecutorService;
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
private final DispatchPredicate dispatchPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
this.dataLoaders.putAll(builder.dataLoaders);
super(builder);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.dispatchPredicate = builder.dispatchPredicate;
this.schedule = builder.schedule;
this.closed = false;
this.dispatchPredicate = builder.dispatchPredicate;
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
}

/**
Expand All @@ -57,6 +60,86 @@ public Duration getScheduleDuration() {
return schedule;
}

/**
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
* and return a new combined registry
*
* @param registry the registry to combine into this registry
*
* @return a new combined registry
*/
public DataLoaderRegistry combine(DataLoaderRegistry registry) {
Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry()
.dispatchPredicate(this.dispatchPredicate);
combinedBuilder.registerAll(this);
combinedBuilder.registerAll(registry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth checking here, or inside registerAll if the other registry is also a ScheduledDataLoaderRegistry and if so picking up its dispatchPredicates too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked - it does

return combinedBuilder.build();
}


/**
* This will unregister a new dataloader
*
* @param key the key of the data loader to unregister
*
* @return this registry
*/
public ScheduledDataLoaderRegistry unregister(String key) {
DataLoader<?, ?> dataLoader = dataLoaders.remove(key);
if (dataLoader != null) {
dataLoaderPredicates.remove(dataLoader);
}
return this;
}

/**
* @return the current dispatch predicate
*/
public DispatchPredicate getDispatchPredicate() {
return dispatchPredicate;
}
Copy link

@rstoyanchev rstoyanchev Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that a DispatchPredicate can be registered per DataLoader, this should ideally be called defaultDispatchPredicate. However, I understand that would involve renaming, and/or a deprecation and a property with the new name. In the very least, the Javadoc should be updated to indicate that it applies by default, if there isn't already a registration for that DataLoader.

Also, really minor but the field and getter for dispatchPredicate could be ordered after the map of predicates, like it is in the builder to reflect it comes after the other registrations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I didnt deprecate but I did redocument


/**
* @return a map of data loaders to specific dispatch predicates
*/
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() {
return new LinkedHashMap<>(dataLoaderPredicates);
}

/**
* This will register a new dataloader and dispatch predicate associated with that data loader
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate to associate with this data loader
*
* @return this registry
*/
public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
dataLoaders.put(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return this;
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
Expand All @@ -68,7 +151,7 @@ public int dispatchAllWithCount() {
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
Expand All @@ -77,24 +160,28 @@ public int dispatchAllWithCount() {
return sum;
}


/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*/
public void dispatchAllImmediately() {
super.dispatchAll();
dispatchAllWithCountImmediately();
}

/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
*/
public int dispatchAllWithCountImmediately() {
return super.dispatchAllWithCount();
return dataLoaders.values().stream()
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
.sum();
}


/**
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
* a pre check of the preodicate like {@link #dispatchAll()} would
Expand All @@ -111,7 +198,7 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
reschedule(key, dataLoader);
Expand All @@ -128,52 +215,69 @@ public static Builder newScheduledRegistry() {
return new Builder();
}

public static class Builder {
public static class Builder extends DataLoaderRegistry.Builder<ScheduledDataLoaderRegistry.Builder> {

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
private Duration schedule = Duration.ofMillis(10);
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();

private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();

private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
return this;
return self();
}

public Builder schedule(Duration schedule) {
this.schedule = schedule;
return this;
return self();
}

public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = nonNull(dispatchPredicate);
return this;
}

/**
* This will register a new dataloader
* This will register a new dataloader with a specific {@link DispatchPredicate}
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader) {
dataLoaders.put(key, dataLoader);
return this;
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
register(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return self();
}

/**
* This will combine together the data loaders in this builder with the ones
* This will combine the data loaders in this builder with the ones
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
*
* @return this builder for a fluent pattern
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
return this;
super.registerAll(otherRegistry);
if (otherRegistry instanceof ScheduledDataLoaderRegistry) {
ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry;
dataLoaderPredicates.putAll(other.dataLoaderPredicates);
}
return self();
}

/**
* This sets a predicate on the {@link DataLoaderRegistry} that will control
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
*
* @param dispatchPredicate the predicate
*
* @return this builder for a fluent pattern
*/
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = dispatchPredicate;
return self();
}

/**
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import org.dataloader.MappedBatchLoaderWithContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -94,4 +97,12 @@ public static void snooze(int millis) {
public static <T> List<T> sort(Collection<? extends T> collection) {
return collection.stream().sorted().collect(toList());
}

public static <T> Set<T> asSet(T... elements) {
return new LinkedHashSet<>(Arrays.asList(elements));
}

public static <T> Set<T> asSet(Collection<T> elements) {
return new LinkedHashSet<>(elements);
}
}
Loading