Skip to content

Commit 066e0fc

Browse files
committed
fix: add ManagedInformerEventSource to manage InformerManager init cycle
1 parent 8c1b5c3 commit 066e0fc

File tree

12 files changed

+94
-70
lines changed

12 files changed

+94
-70
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ public class DefaultResourceConfiguration<R extends HasMetadata>
1111
private final String labelSelector;
1212
private final Set<String> namespaces;
1313
private final boolean watchAllNamespaces;
14+
private final Class<R> resourceClass;
1415
private ConfigurationService service;
1516

16-
public DefaultResourceConfiguration(String labelSelector, String... namespaces) {
17+
public DefaultResourceConfiguration(String labelSelector, Class<R> resourceClass,
18+
String... namespaces) {
1719
this.labelSelector = labelSelector;
20+
this.resourceClass = resourceClass;
1821
this.namespaces = namespaces != null ? Set.of(namespaces) : Collections.emptySet();
1922
this.watchAllNamespaces = this.namespaces.isEmpty();
2023
}
@@ -44,6 +47,11 @@ public ConfigurationService getConfigurationService() {
4447
return service;
4548
}
4649

50+
@Override
51+
public Class<R> getResourceClass() {
52+
return resourceClass;
53+
}
54+
4755
@Override
4856
public void setConfigurationService(ConfigurationService service) {
4957
this.service = service;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java

-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.stream.Stream;
88

99
import io.fabric8.kubernetes.api.model.HasMetadata;
10-
import io.javaoperatorsdk.operator.OperatorException;
1110
import io.javaoperatorsdk.operator.processing.event.Event;
1211
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1312

@@ -84,11 +83,6 @@ public Optional<T> getCachedValue(ResourceID resourceID) {
8483
return cache.get(resourceID);
8584
}
8685

87-
@Override
88-
public void stop() throws OperatorException {
89-
super.stop();
90-
}
91-
9286
@Override
9387
public T getAssociated(P primary) {
9488
return cache.get(ResourceID.fromResource(primary)).orElse(null);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

+6-18
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@
1111
import io.javaoperatorsdk.operator.processing.Controller;
1212
import io.javaoperatorsdk.operator.processing.MDCUtils;
1313
import io.javaoperatorsdk.operator.processing.event.ResourceID;
14-
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
15-
import io.javaoperatorsdk.operator.processing.event.source.InformerManager;
1614
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
15+
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
1716

1817
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
1918
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
2019
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
2120

2221
public class ControllerResourceEventSource<T extends HasMetadata>
23-
extends AbstractResourceEventSource<T, T>
22+
extends ManagedInformerEventSource<T, T, ControllerConfiguration<T>>
2423
implements ResourceEventHandler<T> {
2524

2625
public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
@@ -30,13 +29,10 @@ public class ControllerResourceEventSource<T extends HasMetadata>
3029
private final Controller<T> controller;
3130
private final ResourceEventFilter<T> filter;
3231
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
33-
private final InformerManager<T, ControllerConfiguration<T>> informerManager;
3432

3533
public ControllerResourceEventSource(Controller<T> controller) {
36-
super(controller.getConfiguration().getResourceClass());
34+
super(controller.getCRClient(), controller.getConfiguration());
3735
this.controller = controller;
38-
this.informerManager = new InformerManager<>(controller.getCRClient(),
39-
controller.getConfiguration(), this);
4036

4137
var filters = new ResourceEventFilter[] {
4238
ResourceEventFilters.finalizerNeededAndApplied(),
@@ -59,21 +55,13 @@ public ControllerResourceEventSource(Controller<T> controller) {
5955
@Override
6056
public void start() {
6157
try {
62-
informerManager.start();
58+
super.start();
6359
} catch (Exception e) {
6460
if (e instanceof KubernetesClientException) {
6561
handleKubernetesClientException(e);
6662
}
6763
throw e;
6864
}
69-
super.start();
70-
}
71-
72-
73-
@Override
74-
public void stop() {
75-
informerManager.stop();
76-
super.stop();
7765
}
7866

7967
public void eventReceived(ResourceAction action, T resource, T oldResource) {
@@ -109,7 +97,7 @@ public void onDelete(T resource, boolean b) {
10997
}
11098

11199
public ResourceCache<T> getResourceCache() {
112-
return informerManager;
100+
return manager();
113101
}
114102

115103
/**
@@ -138,6 +126,6 @@ private void handleKubernetesClientException(Exception e) {
138126

139127
@Override
140128
public T getAssociated(T primary) {
141-
return informerManager.get(ResourceID.fromResource(primary)).orElse(null);
129+
return manager().get(ResourceID.fromResource(primary)).orElse(null);
142130
}
143131
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerConfiguration.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Objects;
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
6+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
67
import io.javaoperatorsdk.operator.api.config.DefaultResourceConfiguration;
78
import io.javaoperatorsdk.operator.processing.event.ResourceID;
89
import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier;
@@ -15,15 +16,19 @@ public class InformerConfiguration<R extends HasMetadata, P extends HasMetadata>
1516
private final AssociatedSecondaryResourceIdentifier<P> associatedWith;
1617
private final boolean skipUpdateEventPropagationIfNoChange;
1718

18-
public InformerConfiguration(String labelSelector, String... namespaces) {
19-
this(labelSelector, Mappers.fromOwnerReference(), null, true, namespaces);
19+
public InformerConfiguration(ConfigurationService service, String labelSelector,
20+
Class<R> resourceClass, String... namespaces) {
21+
this(service, labelSelector, resourceClass, Mappers.fromOwnerReference(), null, true,
22+
namespaces);
2023
}
2124

22-
public InformerConfiguration(String labelSelector,
25+
public InformerConfiguration(ConfigurationService service, String labelSelector,
26+
Class<R> resourceClass,
2327
PrimaryResourcesRetriever<R> secondaryToPrimaryResourcesIdSet,
2428
AssociatedSecondaryResourceIdentifier<P> associatedWith,
2529
boolean skipUpdateEventPropagationIfNoChange, String... namespaces) {
26-
super(labelSelector, namespaces);
30+
super(labelSelector, resourceClass, namespaces);
31+
setConfigurationService(service);
2732
this.secondaryToPrimaryResourcesIdSet = secondaryToPrimaryResourcesIdSet;
2833
this.associatedWith =
2934
Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+5-16
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,17 @@
1010
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
1111
import io.javaoperatorsdk.operator.processing.event.Event;
1212
import io.javaoperatorsdk.operator.processing.event.EventHandler;
13-
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
14-
import io.javaoperatorsdk.operator.processing.event.source.InformerManager;
15-
import io.javaoperatorsdk.operator.processing.event.source.InformerResourceCache;
1613
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
17-
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
1814

1915
public class InformerEventSource<T extends HasMetadata, P extends HasMetadata>
20-
extends CachingEventSource<T, P>
16+
extends ManagedInformerEventSource<T, P, InformerConfiguration<T, P>>
2117
implements ResourceCache<T>, ResourceEventHandler<T> {
2218

23-
private final InformerManager<T, InformerConfiguration<T, P>> informerManager;
2419
private final InformerConfiguration<T, P> configuration;
2520

2621
public InformerEventSource(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
2722
InformerConfiguration<T, P> configuration) {
28-
super(configuration.getResourceClass());
29-
this.informerManager = new InformerManager<>(client, configuration, this);
23+
super(client, configuration);
3024
this.configuration = configuration;
3125
}
3226

@@ -72,12 +66,12 @@ private void propagateEvent(T object) {
7266

7367
@Override
7468
public void start() {
75-
informerManager.start();
69+
manager().start();
7670
}
7771

7872
@Override
7973
public void stop() {
80-
informerManager.stop();
74+
manager().stop();
8175
}
8276

8377
/**
@@ -92,13 +86,8 @@ public T getAssociated(P resource) {
9286
return get(id).orElse(null);
9387
}
9488

95-
@Override
96-
protected UpdatableCache<T> initCache() {
97-
return informerManager;
98-
}
99-
10089
@Override
10190
public Stream<T> list(String namespace, Predicate<T> predicate) {
102-
return ((InformerResourceCache<T>) cache).list(namespace, predicate);
91+
return manager().list(namespace, predicate);
10392
}
10493
}
+10-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.processing.event.source;
1+
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

33
import java.util.Map;
44
import java.util.Optional;
@@ -20,7 +20,10 @@
2020
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
2121
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2222
import io.javaoperatorsdk.operator.processing.event.ResourceID;
23-
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper;
23+
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
24+
import io.javaoperatorsdk.operator.processing.event.source.Cache;
25+
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
26+
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
2427

2528
public class InformerManager<T extends HasMetadata, C extends ResourceConfiguration<T>>
2629
implements LifecycleAware, ResourceCache<T>, UpdatableCache<T> {
@@ -29,23 +32,17 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
2932
private static final Logger log = LoggerFactory.getLogger(AbstractResourceEventSource.class);
3033

3134
private final Map<String, InformerWrapper<T>> sources = new ConcurrentHashMap<>();
32-
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
33-
private final Cloner cloner;
34-
35-
public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
36-
C configuration, ResourceEventHandler<T> eventHandler) {
37-
this.client = client;
38-
this.cloner = configuration.getConfigurationService().getResourceCloner();
39-
40-
initSources(configuration, eventHandler);
41-
}
35+
private Cloner cloner;
4236

4337
@Override
4438
public void start() throws OperatorException {
4539
sources.values().parallelStream().forEach(LifecycleAware::start);
4640
}
4741

48-
private void initSources(C configuration, ResourceEventHandler<T> eventHandler) {
42+
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
43+
C configuration, ResourceEventHandler<T> eventHandler) {
44+
this.cloner = configuration.getConfigurationService().getResourceCloner();
45+
4946
final var targetNamespaces = configuration.getEffectiveNamespaces();
5047
final var labelSelector = configuration.getLabelSelector();
5148

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.processing.event.source;
1+
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

33
import java.util.Objects;
44
import java.util.Optional;
@@ -9,10 +9,10 @@
99
import io.fabric8.kubernetes.client.informers.SharedInformer;
1010
import io.fabric8.kubernetes.client.informers.cache.Cache;
1111
import io.javaoperatorsdk.operator.processing.event.ResourceID;
12-
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
12+
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
13+
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
1314

14-
public class InformerResourceCache<T extends HasMetadata>
15-
implements ResourceCache<T>, UpdatableCache<T> {
15+
class InformerResourceCache<T extends HasMetadata> implements ResourceCache<T>, UpdatableCache<T> {
1616

1717
private final Cache<T> cache;
1818

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@
1010
import io.javaoperatorsdk.operator.OperatorException;
1111
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1212
import io.javaoperatorsdk.operator.processing.event.ResourceID;
13-
import io.javaoperatorsdk.operator.processing.event.source.InformerResourceCache;
1413
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
1514
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
1615

17-
public class InformerWrapper<T extends HasMetadata>
16+
class InformerWrapper<T extends HasMetadata>
1817
implements LifecycleAware, ResourceCache<T>, UpdatableCache<T> {
1918
private final SharedIndexInformer<T> informer;
2019
private final InformerResourceCache<T> cache;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.informer;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
5+
import io.fabric8.kubernetes.client.dsl.MixedOperation;
6+
import io.fabric8.kubernetes.client.dsl.Resource;
7+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
8+
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
9+
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
10+
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
11+
12+
public abstract class ManagedInformerEventSource<R extends HasMetadata, P extends HasMetadata, C extends ResourceConfiguration<R>>
13+
extends CachingEventSource<R, P>
14+
implements ResourceEventHandler<R> {
15+
16+
public ManagedInformerEventSource(
17+
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
18+
super(configuration.getResourceClass());
19+
manager().initSources(client, configuration, this);
20+
}
21+
22+
@Override
23+
protected UpdatableCache<R> initCache() {
24+
return new InformerManager<>();
25+
}
26+
27+
protected InformerManager<R, C> manager() {
28+
return (InformerManager<R, C>) cache;
29+
}
30+
31+
@Override
32+
public void start() {
33+
manager().start();
34+
super.start();
35+
}
36+
37+
38+
@Override
39+
public void stop() {
40+
super.stop();
41+
manager().stop();
42+
}
43+
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomReconciler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class InformerEventSourceTestCustomReconciler implements
4242
public List<EventSource> prepareEventSources(
4343
EventSourceInitializationContext<InformerEventSourceTestCustomResource> context) {
4444
final var config = new InformerConfiguration<ConfigMap, InformerEventSourceTestCustomResource>(
45-
null, Mappers.fromAnnotation(RELATED_RESOURCE_NAME), null, true);
45+
context.getConfigurationService(), null, ConfigMap.class,
46+
Mappers.fromAnnotation(RELATED_RESOURCE_NAME), null, true);
4647
return List.of(new InformerEventSource<>(kubernetesClient.resources(ConfigMap.class), config));
4748
}
4849

sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatReconciler.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public TomcatReconciler(KubernetesClient client) {
4343

4444
@Override
4545
public List<EventSource> prepareEventSources(EventSourceInitializationContext<Tomcat> context) {
46-
final var deploymentConfig = new InformerConfiguration<Deployment, Tomcat>(
47-
"app.kubernetes.io/managed-by=tomcat-operator");
46+
final var deploymentConfig =
47+
new InformerConfiguration<Deployment, Tomcat>(context.getConfigurationService(),
48+
"app.kubernetes.io/managed-by=tomcat-operator", Deployment.class);
4849

4950
return List.of(
5051
new InformerEventSource<>(kubernetesClient.resources(Deployment.class), deploymentConfig));

sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerConfiguration;
3030
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
3131

32-
import okhttp3.Response;
33-
3432
@ControllerConfiguration
3533
public class WebappReconciler implements Reconciler<Webapp>, EventSourceInitializer<Webapp> {
3634

@@ -44,7 +42,8 @@ public WebappReconciler(KubernetesClient kubernetesClient) {
4442

4543
@Override
4644
public List<EventSource> prepareEventSources(EventSourceInitializationContext<Webapp> context) {
47-
final var config = new InformerConfiguration<>(null,
45+
final var config = new InformerConfiguration<>(context.getConfigurationService(), null,
46+
Tomcat.class,
4847
(Tomcat t) -> {
4948
// To create an event to a related WebApp resource and trigger the reconciliation
5049
// we need to find which WebApp this Tomcat custom resource is related to.

0 commit comments

Comments
 (0)