Skip to content

refactor: simplify handling of reused event sources #1518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,17 @@ public List<DependentResourceSpec> getDependentResources() {
throw new IllegalArgumentException(
"A DependentResource named '" + name + "' already exists: " + spec);
}

var eventSourceName = dependent.useEventSourceWithName();
eventSourceName = Constants.NO_VALUE_SET.equals(eventSourceName) ? null : eventSourceName;

final var context = "DependentResource of type '" + dependentType.getName() + "'";
spec = new DependentResourceSpec(dependentType, config, name,
Set.of(dependent.dependsOn()),
instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
dependent.provideEventSource());
eventSourceName);
specsMap.put(name, spec);
}

Expand Down Expand Up @@ -287,7 +291,6 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
OnDeleteFilter<? extends HasMetadata> onDeleteFilter = null;
GenericFilter<? extends HasMetadata> genericFilter = null;
ResourceDiscriminator<?, ? extends HasMetadata> resourceDiscriminator = null;
String eventSourceNameToUse = null;
if (kubeDependent != null) {
if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES,
kubeDependent.namespaces())) {
Expand All @@ -314,14 +317,12 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
resourceDiscriminator =
instantiateIfNotDefault(kubeDependent.resourceDiscriminator(),
ResourceDiscriminator.class, context);
eventSourceNameToUse = Constants.NO_VALUE_SET.equals(kubeDependent.eventSourceToUse()) ? null
: kubeDependent.eventSourceToUse();
}

config =
new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS,
resourceDiscriminator, onAddFilter,
onUpdateFilter, onDeleteFilter, genericFilter, eventSourceNameToUse);
onUpdateFilter, onDeleteFilter, genericFilter);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
namedDependentResourceSpecs.put(name,
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
current.getDeletePostCondition(), current.provideEventSource()));
current.getDeletePostCondition(), current.getUseEventSourceWithName().orElse(null)));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -220,7 +220,8 @@ public ControllerConfiguration<R> build() {
KubernetesDependentResourceConfig c) {
return new DependentResourceSpec(spec.getDependentResourceClass(),
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
spec.getReconcileCondition(), spec.getDeletePostCondition(), spec.provideEventSource());
spec.getReconcileCondition(), spec.getDeletePostCondition(),
(String) spec.getUseEventSourceWithName().orElse(null));
}

public static <R extends HasMetadata> ControllerConfigurationOverrider<R> override(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ public class DependentResourceSpec<T extends DependentResource<?, ?>, C> {

private final Condition<?, ?> deletePostCondition;

private final boolean provideEventSource;
private final String useEventSourceWithName;

public DependentResourceSpec(Class<T> dependentResourceClass, C dependentResourceConfig,
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
boolean provideEventSource) {
String useEventSourceWithName) {
this.dependentResourceClass = dependentResourceClass;
this.dependentResourceConfig = dependentResourceConfig;
this.name = name;
this.dependsOn = dependsOn;
this.readyCondition = readyCondition;
this.reconcileCondition = reconcileCondition;
this.deletePostCondition = deletePostCondition;
this.provideEventSource = provideEventSource;
this.useEventSourceWithName = useEventSourceWithName;
}

public Class<T> getDependentResourceClass() {
Expand Down Expand Up @@ -94,7 +94,7 @@ public Condition getDeletePostCondition() {
return deletePostCondition;
}

public boolean provideEventSource() {
return provideEventSource;
public Optional<String> getUseEventSourceWithName() {
return Optional.ofNullable(useEventSourceWithName);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;
Expand Down Expand Up @@ -59,11 +60,11 @@
String[] dependsOn() default {};

/**
* Setting this to false means that the event source provided by the dependent resource won't be
* used. This is helpful if more dependent resources created for the same type, and want to share
* a common event source. In that case an event source needs to be explicitly registered.
* Setting here a name of the event source means that dependent resource will use an event source
* registered with that name. So won't create one. This is helpful if more dependent resources
* created for the same type, and want to share a common event source.
*
* @return if the event source (if any) provided by the dependent resource should be used or not.
* @return event source name (if any) provided by the dependent resource should be used.
*/
boolean provideEventSource() default true;
String useEventSourceWithName() default NO_VALUE_SET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ default Optional<ResourceEventSource<R, P>> eventSource(
return Optional.empty();
}

/**
* Calling this method, instructs the implementation to not provide an event source, even if it
* normally does.
*/
void doNotProvideEventSource();


default Optional<R> getSecondaryResource(P primary, Context<P> context) {
return Optional.empty();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import io.javaoperatorsdk.operator.OperatorException;

public class EventSourceNotFoundException extends OperatorException {

private String eventSourceName;

public EventSourceNotFoundException(String eventSourceName) {
this.eventSourceName = eventSourceName;
}

public String getEventSourceName() {
return eventSourceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;

public interface EventSourceReferencer<P extends HasMetadata> {

default void useEventSourceWithName(String name) {}

/**
* Throws {@link EventSourceNotFoundException} an exception if the target event source to use is
* not found.
*/
void resolveEventSource(EventSourceRetriever<P> eventSourceRetriever)
throws EventSourceNotFoundException;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javaoperatorsdk.operator.processing;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -32,8 +35,9 @@
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceNotFoundException;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult;
Expand Down Expand Up @@ -214,25 +218,40 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
final var ownSources = provider.prepareEventSources(context);
ownSources.forEach(eventSourceManager::registerEventSource);
}
managedWorkflow
.getDependentResourcesByName().entrySet().stream()
.forEach(drEntry -> {
if (drEntry.getValue() instanceof EventSourceProvider) {
final var provider = (EventSourceProvider) drEntry.getValue();
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(drEntry.getKey(), source);
} else {
Optional<ResourceEventSource> eventSource =
drEntry.getValue().eventSource(context);
eventSource.ifPresent(es -> {
eventSourceManager.registerEventSource(drEntry.getKey(), es);
});
}
});
managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue)
.filter(EventSourceAware.class::isInstance)
.forEach(dr -> ((EventSourceAware) dr)
.selectEventSources(eventSourceManager));

// register created event sources
final var dependentResourcesByName = managedWorkflow.getDependentResourcesByName();
final var size = dependentResourcesByName.size();
if (size > 0) {
dependentResourcesByName.forEach((key, value) -> {
if (value instanceof EventSourceProvider) {
final var provider = (EventSourceProvider) value;
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(key, source);
} else {
Optional<ResourceEventSource> eventSource = value.eventSource(context);
eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
}
});

// resolve event sources referenced by name for dependents that reuse an existing event source
final Map<String, List<EventSourceReferencer>> unresolvable = new HashMap<>(size);
dependentResourcesByName.values().stream()
.filter(EventSourceReferencer.class::isInstance)
.map(EventSourceReferencer.class::cast)
.forEach(dr -> {
try {
((EventSourceReferencer<P>) dr)
.resolveEventSource(eventSourceManager);
} catch (EventSourceNotFoundException e) {
unresolvable.computeIfAbsent(e.getEventSourceName(), s -> new ArrayList<>()).add(dr);
}
});
if (!unresolvable.isEmpty()) {
throw new IllegalStateException(
"Couldn't resolve referenced EventSources: " + unresolvable);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;

@Ignore
public abstract class AbstractDependentResource<R, P extends HasMetadata>
Expand All @@ -29,7 +27,6 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
protected Creator<R, P> creator;
protected Updater<R, P> updater;
protected BulkDependentResource<R, P> bulkDependentResource;
private boolean returnEventSource = true;

protected List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);

Expand All @@ -41,23 +38,6 @@ public AbstractDependentResource() {
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
}

@Override
public void doNotProvideEventSource() {
this.returnEventSource = false;
}

@Override
public Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> eventSourceContext) {
if (!returnEventSource) {
return Optional.empty();
} else {
return Optional.of(provideEventSource(eventSourceContext));
}
}

protected abstract ResourceEventSource<R, P> provideEventSource(
EventSourceContext<P> eventSourceContext);

@Override
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
if (bulk) {
Expand Down Expand Up @@ -239,7 +219,4 @@ protected int lastKnownBulkSize() {
return resourceDiscriminator.size();
}

protected boolean getReturnEventSource() {
return returnEventSource;
}
}
Loading