Skip to content

refactor: simplify handling of reused event sources #1516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ public List<DependentResourceSpec> getDependentResources() {
Set.of(dependent.dependsOn()),
instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context));
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
Constants.NO_VALUE_SET.equals(dependent.eventSource()) ? null
: dependent.eventSource());
specsMap.put(name, spec);
}

Expand Down Expand Up @@ -286,13 +288,13 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
OnDeleteFilter<? extends HasMetadata> onDeleteFilter = null;
GenericFilter<? extends HasMetadata> genericFilter = null;
ResourceDiscriminator<?, ? extends HasMetadata> resourceDiscriminator = null;
String eventSourceNameToUse = null;
if (kubeDependent != null) {
if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES,
kubeDependent.namespaces())) {
namespaces = Set.of(kubeDependent.namespaces());
configuredNS = true;
}

final var fromAnnotation = kubeDependent.labelSelector();
labelSelector = Constants.NO_VALUE_SET.equals(fromAnnotation) ? null : fromAnnotation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
namedDependentResourceSpecs.put(name,
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
current.getDeletePostCondition()));
current.getDeletePostCondition(), current.getEventSourceName().orElse(null)));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -220,7 +220,8 @@ public ControllerConfiguration<R> build() {
KubernetesDependentResourceConfig c) {
return new DependentResourceSpec(spec.getDependentResourceClass(),
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
spec.getReconcileCondition(), spec.getDeletePostCondition());
spec.getReconcileCondition(), spec.getDeletePostCondition(),
(String) spec.getEventSourceName().orElse(null));
}

public static <R extends HasMetadata> ControllerConfigurationOverrider<R> override(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@ public class DependentResourceSpec<T extends DependentResource<?, ?>, C> {

private final Condition<?, ?> deletePostCondition;

private final String eventSourceName;

public DependentResourceSpec(Class<T> dependentResourceClass, C dependentResourceConfig,
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition) {
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
String eventSourceName) {
this.dependentResourceClass = dependentResourceClass;
this.dependentResourceConfig = dependentResourceConfig;
this.name = name;
this.dependsOn = dependsOn;
this.readyCondition = readyCondition;
this.reconcileCondition = reconcileCondition;
this.deletePostCondition = deletePostCondition;
this.eventSourceName = eventSourceName;
}

public Class<T> getDependentResourceClass() {
Expand Down Expand Up @@ -89,4 +93,8 @@ public Condition getReconcileCondition() {
public Condition getDeletePostCondition() {
return deletePostCondition;
}

public Optional<String> getEventSourceName() {
return Optional.ofNullable(eventSourceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;

Expand All @@ -13,14 +15,14 @@
*/
public class EventSourceContext<P extends HasMetadata> {

private final IndexerResourceCache<P> primaryCache;
private final EventSourceManager<P> eventSourceManager;
private final ControllerConfiguration<P> controllerConfiguration;
private final KubernetesClient client;

public EventSourceContext(IndexerResourceCache<P> primaryCache,
public EventSourceContext(EventSourceManager<P> eventSourceManager,
ControllerConfiguration<P> controllerConfiguration,
KubernetesClient client) {
this.primaryCache = primaryCache;
this.eventSourceManager = eventSourceManager;
this.controllerConfiguration = controllerConfiguration;
this.client = client;
}
Expand All @@ -31,7 +33,7 @@ public EventSourceContext(IndexerResourceCache<P> primaryCache,
* @return the primary resource cache
*/
public IndexerResourceCache<P> getPrimaryCache() {
return primaryCache;
return eventSourceManager.getControllerResourceEventSource();
}

/**
Expand All @@ -54,4 +56,8 @@ public ControllerConfiguration<P> getControllerConfiguration() {
public KubernetesClient getClient() {
return client;
}

public EventSourceRetriever<P> getEventSourceRetriever() {
return eventSourceManager;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

/**
Expand Down Expand Up @@ -39,6 +43,21 @@ static Map<String, EventSource> nameEventSources(EventSource... eventSources) {
return eventSourceMap;
}

@SuppressWarnings("unchecked,rawtypes")
static <K extends HasMetadata> Map<String, EventSource> nameEventSourcesFromDependentResource(
EventSourceContext<K> context, DependentResource... dependentResources) {
if (dependentResources != null) {
Map<String, EventSource> eventSourceMap = new HashMap<>(dependentResources.length);
Arrays.stream(dependentResources)
.filter(EventSourceAware.class::isInstance)
.forEach(esa -> ((EventSourceAware<?, K>) esa).eventSource(context)
.ifPresent(es -> eventSourceMap.put(generateNameFor(es), es)));
return eventSourceMap;
} else {
return Collections.emptyMap();
}
}

/**
* This is for the use case when the event sources are not access explicitly by name in the
* reconciler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,13 @@
* one can be
*/
String[] dependsOn() default {};

/**
* Setting here a name of the event source means that dependent resource will use an event source
* registered with that name. So won't create one. This is helpful if more dependent resources
* created for the same type, and want to share a common event source.
*
* @return event source name (if any) provided by the dependent resource should be used.
*/
String eventSource() default NO_VALUE_SET;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;

public interface EventSourceAware<R, P extends HasMetadata> {

/**
* Dependent resources are designed to by default provide event sources. There are cases where it
* might not:
* <ul>
* <li>If an event source is shared between multiple dependent resources. In this case only one or
* none of the dependent resources sharing the event source should provide one.</li>
* <li>Some special implementation of an event source. That just execute some action might not
* provide one.</li>
* </ul>
*
* @param context context of event source initialization
* @return an optional event source
*/
default Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> context) {
return getUsedEventSourceName().map(
name -> context.getEventSourceRetriever().getResourceEventSourceFor(resourceType(), name));
}

Class<R> resourceType();

void useEventSourceNamed(String eventSourceName);

default Optional<String> getUsedEventSourceName() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

/**
* @deprecated now event source related methods are directly on {@link DependentResource}
* @param <P> primary resource
*/
@Deprecated(forRemoval = true)
public interface EventSourceProvider<P extends HasMetadata> {
/**
* @param context - event source context where the event source is initialized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ReconcileResult<R> {

private final R resource;
private final Operation operation;
private final Map<R, Operation> resourceOperations;

public static <T> ReconcileResult<T> resourceCreated(T resource) {
return new ReconcileResult<>(resource, Operation.CREATED);
Expand All @@ -22,25 +22,49 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
return new ReconcileResult<>(resource, Operation.NONE);
}

@SafeVarargs
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
if (results == null) {
throw new IllegalArgumentException("Should provide results to aggregate");
}
if (results.length == 1) {
return results[0];
}
final Map<T, Operation> operations = new HashMap<>(results.length);
for (ReconcileResult<T> res : results) {
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
}
return new ReconcileResult<>(operations);
}

@Override
public String toString() {
return getResource()
.map(r -> r instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) r) : r)
.orElse("no resource")
+ " -> " + operation;
return resourceOperations.entrySet().stream().collect(Collectors.toMap(
e -> e instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) e) : e,
Map.Entry::getValue))
.toString();
}

private ReconcileResult(R resource, Operation operation) {
this.resource = resource;
this.operation = operation;
resourceOperations = resource != null ? Map.of(resource, operation) : Collections.emptyMap();
}

private ReconcileResult(Map<R, Operation> operations) {
resourceOperations = Collections.unmodifiableMap(operations);
}

public Optional<R> getSingleResource() {
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getKey);
}

public Optional<R> getResource() {
return Optional.ofNullable(resource);
public Operation getSingleOperation() {
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getValue)
.orElseThrow();
}

public Operation getOperation() {
return operation;
@SuppressWarnings("unused")
public Map<R, Operation> getResourceOperations() {
return resourceOperations;
}

public enum Operation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
Expand Down Expand Up @@ -207,21 +208,26 @@ private void initContextIfNeeded(P resource, Context<P> context) {
}

public void initAndRegisterEventSources(EventSourceContext<P> context) {
managedWorkflow
.getDependentResourcesByName().entrySet().stream()
.filter(drEntry -> drEntry.getValue() instanceof EventSourceProvider)
.forEach(drEntry -> {
final var provider = (EventSourceProvider) drEntry.getValue();
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(drEntry.getKey(), source);
});

// add manually defined event sources
if (reconciler instanceof EventSourceInitializer) {
final var provider = (EventSourceInitializer<P>) this.reconciler;
final var ownSources = provider.prepareEventSources(context);
ownSources.forEach(eventSourceManager::registerEventSource);
}
managedWorkflow.getDependentResourcesByName().entrySet().stream().filter(entry -> {
final var value = entry.getValue();
return value instanceof EventSourceProvider || value instanceof EventSourceAware;
}).forEach(entry -> {
final var value = entry.getValue();
final var key = entry.getKey();
if (value instanceof EventSourceProvider) {
final var provider = (EventSourceProvider) value;
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(key, source);
} else {
((EventSourceAware<?, P>) value).eventSource(context)
.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
}
});
}

@Override
Expand Down Expand Up @@ -288,8 +294,8 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep
try {
// check that the custom resource is known by the cluster if configured that way
validateCRDWithLocalModelIfRequired(resClass, controllerName, crdName, specVersion);
final var context = new EventSourceContext<>(
eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient);
final var context =
new EventSourceContext<>(eventSourceManager, configuration, kubernetesClient);

initAndRegisterEventSources(context);
eventSourceManager.start();
Expand Down
Loading