Skip to content

Commit 79c63fb

Browse files
committed
feat: optional eventsource on dependent resources (#1479)
1 parent 733e362 commit 79c63fb

File tree

53 files changed

+941
-300
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+941
-300
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ public List<DependentResourceSpec> getDependentResources() {
247247
Set.of(dependent.dependsOn()),
248248
instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
249249
instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
250-
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context));
250+
instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
251+
dependent.provideEventSource());
251252
specsMap.put(name, spec);
252253
}
253254

@@ -286,13 +287,13 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
286287
OnDeleteFilter<? extends HasMetadata> onDeleteFilter = null;
287288
GenericFilter<? extends HasMetadata> genericFilter = null;
288289
ResourceDiscriminator<?, ? extends HasMetadata> resourceDiscriminator = null;
290+
String eventSourceNameToUse = null;
289291
if (kubeDependent != null) {
290292
if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES,
291293
kubeDependent.namespaces())) {
292294
namespaces = Set.of(kubeDependent.namespaces());
293295
configuredNS = true;
294296
}
295-
296297
final var fromAnnotation = kubeDependent.labelSelector();
297298
labelSelector = Constants.NO_VALUE_SET.equals(fromAnnotation) ? null : fromAnnotation;
298299

@@ -313,12 +314,14 @@ private Object createKubernetesResourceConfig(Class<? extends DependentResource>
313314
resourceDiscriminator =
314315
instantiateIfNotDefault(kubeDependent.resourceDiscriminator(),
315316
ResourceDiscriminator.class, context);
317+
eventSourceNameToUse = Constants.NO_VALUE_SET.equals(kubeDependent.eventSourceToUse()) ? null
318+
: kubeDependent.eventSourceToUse();
316319
}
317320

318321
config =
319322
new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS,
320323
resourceDiscriminator, onAddFilter,
321-
onUpdateFilter, onDeleteFilter, genericFilter);
324+
onUpdateFilter, onDeleteFilter, genericFilter, eventSourceNameToUse);
322325

323326
return config;
324327
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
174174
namedDependentResourceSpecs.put(name,
175175
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
176176
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
177-
current.getDeletePostCondition()));
177+
current.getDeletePostCondition(), current.provideEventSource()));
178178
}
179179

180180
@SuppressWarnings("unchecked")
@@ -220,7 +220,7 @@ public ControllerConfiguration<R> build() {
220220
KubernetesDependentResourceConfig c) {
221221
return new DependentResourceSpec(spec.getDependentResourceClass(),
222222
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
223-
spec.getReconcileCondition(), spec.getDeletePostCondition());
223+
spec.getReconcileCondition(), spec.getDeletePostCondition(), spec.provideEventSource());
224224
}
225225

226226
public static <R extends HasMetadata> ControllerConfigurationOverrider<R> override(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@ public class DependentResourceSpec<T extends DependentResource<?, ?>, C> {
2323

2424
private final Condition<?, ?> deletePostCondition;
2525

26+
private final boolean provideEventSource;
27+
2628
public DependentResourceSpec(Class<T> dependentResourceClass, C dependentResourceConfig,
2729
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
28-
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition) {
30+
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
31+
boolean provideEventSource) {
2932
this.dependentResourceClass = dependentResourceClass;
3033
this.dependentResourceConfig = dependentResourceConfig;
3134
this.name = name;
3235
this.dependsOn = dependsOn;
3336
this.readyCondition = readyCondition;
3437
this.reconcileCondition = reconcileCondition;
3538
this.deletePostCondition = deletePostCondition;
39+
this.provideEventSource = provideEventSource;
3640
}
3741

3842
public Class<T> getDependentResourceClass() {
@@ -89,4 +93,8 @@ public Condition getReconcileCondition() {
8993
public Condition getDeletePostCondition() {
9094
return deletePostCondition;
9195
}
96+
97+
public boolean provideEventSource() {
98+
return provideEventSource;
99+
}
92100
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java

+20
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.util.Collections;
34
import java.util.HashMap;
45
import java.util.Map;
6+
import java.util.Optional;
57

68
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
710
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
11+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
812

913
/**
1014
* An interface that a {@link Reconciler} can implement to have the SDK register the provided
@@ -39,6 +43,22 @@ static Map<String, EventSource> nameEventSources(EventSource... eventSources) {
3943
return eventSourceMap;
4044
}
4145

46+
@SuppressWarnings("unchecked,rawtypes")
47+
static <K extends HasMetadata> Map<String, EventSource> nameEventSourcesFromDependentResource(
48+
EventSourceContext<K> context, DependentResource... dependentResources) {
49+
50+
if (dependentResources != null) {
51+
Map<String, EventSource> eventSourceMap = new HashMap<>(dependentResources.length);
52+
for (DependentResource dependentResource : dependentResources) {
53+
Optional<ResourceEventSource> es = dependentResource.eventSource(context);
54+
es.ifPresent(e -> eventSourceMap.put(generateNameFor(e), e));
55+
}
56+
return eventSourceMap;
57+
} else {
58+
return Collections.emptyMap();
59+
}
60+
}
61+
4262
/**
4363
* This is for the use case when the event sources are not access explicitly by name in the
4464
* reconciler.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceDiscriminator.java

+1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
public interface ResourceDiscriminator<R, P extends HasMetadata> {
88

99
Optional<R> distinguish(Class<R> resource, P primary, Context<P> context);
10+
1011
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java

+9
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,13 @@
5757
* one can be
5858
*/
5959
String[] dependsOn() default {};
60+
61+
/**
62+
* Setting this to false means that the event source provided by the dependent resource won't be
63+
* used. This is helpful if more dependent resources created for the same type, and want to share
64+
* a common event source. In that case an event source needs to be explicitly registered.
65+
*
66+
* @return if the event source (if any) provided by the dependent resource should be used or not.
67+
*/
68+
boolean provideEventSource() default true;
6069
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java

+27
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
66
import io.javaoperatorsdk.operator.api.reconciler.Context;
7+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
8+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
79

810
/**
911
* An interface to implement and provide dependent resource support.
@@ -29,6 +31,31 @@ public interface DependentResource<R, P extends HasMetadata> {
2931
*/
3032
Class<R> resourceType();
3133

34+
/**
35+
* Dependent resources are designed to by default provide event sources. There are cases where it
36+
* might not:
37+
* <ul>
38+
* <li>If an event source is shared between multiple dependent resources. In this case only one or
39+
* none of the dependent resources sharing the event source should provide one.</li>
40+
* <li>Some special implementation of an event source. That just execute some action might not
41+
* provide one.</li>
42+
* </ul>
43+
*
44+
* @param eventSourceContext context of event source initialization
45+
* @return an optional event source
46+
*/
47+
default Optional<ResourceEventSource<R, P>> eventSource(
48+
EventSourceContext<P> eventSourceContext) {
49+
return Optional.empty();
50+
}
51+
52+
/**
53+
* Calling this method, instructs the implementation to not provide an event source, even if it
54+
* normally does.
55+
*/
56+
void doNotProvideEventSource();
57+
58+
3259
default Optional<R> getSecondaryResource(P primary, Context<P> context) {
3360
return Optional.empty();
3461
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.javaoperatorsdk.operator.api.reconciler.dependent;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
5+
6+
public interface EventSourceAware<P extends HasMetadata> {
7+
8+
void selectEventSources(EventSourceRetriever<P> eventSourceRetriever);
9+
10+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
55
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
66

7+
/**
8+
* @deprecated now event source related methods are directly on {@link DependentResource}
9+
* @param <P> primary resource
10+
*/
11+
@Deprecated(forRemoval = true)
712
public interface EventSourceProvider<P extends HasMetadata> {
813
/**
914
* @param context - event source context where the event source is initialized

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+21-10
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@
3232
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
3333
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3434
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
35+
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
3536
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
3637
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
3738
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
3839
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult;
3940
import io.javaoperatorsdk.operator.processing.event.EventProcessor;
4041
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
4142
import io.javaoperatorsdk.operator.processing.event.ResourceID;
43+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
4244

4345
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE;
4446

@@ -207,21 +209,30 @@ private void initContextIfNeeded(P resource, Context<P> context) {
207209
}
208210

209211
public void initAndRegisterEventSources(EventSourceContext<P> context) {
210-
managedWorkflow
211-
.getDependentResourcesByName().entrySet().stream()
212-
.filter(drEntry -> drEntry.getValue() instanceof EventSourceProvider)
213-
.forEach(drEntry -> {
214-
final var provider = (EventSourceProvider) drEntry.getValue();
215-
final var source = provider.initEventSource(context);
216-
eventSourceManager.registerEventSource(drEntry.getKey(), source);
217-
});
218-
219-
// add manually defined event sources
220212
if (reconciler instanceof EventSourceInitializer) {
221213
final var provider = (EventSourceInitializer<P>) this.reconciler;
222214
final var ownSources = provider.prepareEventSources(context);
223215
ownSources.forEach(eventSourceManager::registerEventSource);
224216
}
217+
managedWorkflow
218+
.getDependentResourcesByName().entrySet().stream()
219+
.forEach(drEntry -> {
220+
if (drEntry.getValue() instanceof EventSourceProvider) {
221+
final var provider = (EventSourceProvider) drEntry.getValue();
222+
final var source = provider.initEventSource(context);
223+
eventSourceManager.registerEventSource(drEntry.getKey(), source);
224+
} else {
225+
Optional<ResourceEventSource> eventSource =
226+
drEntry.getValue().eventSource(context);
227+
eventSource.ifPresent(es -> {
228+
eventSourceManager.registerEventSource(drEntry.getKey(), es);
229+
});
230+
}
231+
});
232+
managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue)
233+
.filter(EventSourceAware.class::isInstance)
234+
.forEach(dr -> ((EventSourceAware) dr)
235+
.selectEventSources(eventSourceManager));
225236
}
226237

227238
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
1111
import io.javaoperatorsdk.operator.api.reconciler.Context;
12+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
1213
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
1314
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
1415
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1516
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
1617
import io.javaoperatorsdk.operator.processing.event.ResourceID;
18+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
1719

1820
@Ignore
1921
public abstract class AbstractDependentResource<R, P extends HasMetadata>
@@ -27,8 +29,9 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
2729
protected Creator<R, P> creator;
2830
protected Updater<R, P> updater;
2931
protected BulkDependentResource<R, P> bulkDependentResource;
32+
private boolean returnEventSource = true;
3033

31-
private final List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);
34+
protected List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);
3235

3336
@SuppressWarnings("unchecked")
3437
public AbstractDependentResource() {
@@ -38,6 +41,23 @@ public AbstractDependentResource() {
3841
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
3942
}
4043

44+
@Override
45+
public void doNotProvideEventSource() {
46+
this.returnEventSource = false;
47+
}
48+
49+
@Override
50+
public Optional<ResourceEventSource<R, P>> eventSource(EventSourceContext<P> eventSourceContext) {
51+
if (!returnEventSource) {
52+
return Optional.empty();
53+
} else {
54+
return Optional.of(provideEventSource(eventSourceContext));
55+
}
56+
}
57+
58+
protected abstract ResourceEventSource<R, P> provideEventSource(
59+
EventSourceContext<P> eventSourceContext);
60+
4161
@Override
4262
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
4363
if (bulk) {
@@ -172,7 +192,7 @@ protected R handleCreate(R desired, P primary, Context<P> context) {
172192
protected abstract void onCreated(ResourceID primaryResourceId, R created);
173193

174194
/**
175-
* Allows sub-classes to perform additional processing on the updated resource if needed.
195+
* Allows subclasses to perform additional processing on the updated resource if needed.
176196
*
177197
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
178198
* newly updated resource
@@ -219,4 +239,7 @@ protected int lastKnownBulkSize() {
219239
return resourceDiscriminator.size();
220240
}
221241

242+
protected boolean getReturnEventSource() {
243+
return returnEventSource;
244+
}
222245
}

0 commit comments

Comments
 (0)