Skip to content

Commit aecc4c2

Browse files
committed
refactor: encapsulate dependent resources in DependentResourceManager
1 parent 7d21314 commit aecc4c2

File tree

2 files changed

+190
-109
lines changed

2 files changed

+190
-109
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+20-109
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package io.javaoperatorsdk.operator.processing;
22

3-
import java.util.ArrayList;
3+
import java.util.LinkedList;
44
import java.util.List;
55
import java.util.Objects;
6-
import java.util.concurrent.atomic.AtomicBoolean;
7-
import java.util.concurrent.atomic.AtomicInteger;
86

97
import org.slf4j.Logger;
108
import org.slf4j.LoggerFactory;
@@ -22,20 +20,16 @@
2220
import io.javaoperatorsdk.operator.api.config.BaseConfigurationService;
2321
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
2422
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
25-
import io.javaoperatorsdk.operator.api.config.DependentResource;
2623
import io.javaoperatorsdk.operator.api.config.Version;
2724
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
2825
import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution;
2926
import io.javaoperatorsdk.operator.api.reconciler.Context;
30-
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
3127
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
3228
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
33-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContextInjector;
3429
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
3530
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3631
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
37-
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceController;
38-
import io.javaoperatorsdk.operator.api.reconciler.dependent.KubernetesDependentResourceController;
32+
import io.javaoperatorsdk.operator.processing.dependent.DependentResourceManager;
3933
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
4034
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
4135

@@ -48,9 +42,9 @@ public class Controller<R extends HasMetadata> implements Reconciler<R>,
4842
private final Reconciler<R> reconciler;
4943
private final ControllerConfiguration<R> configuration;
5044
private final KubernetesClient kubernetesClient;
51-
private final AtomicBoolean started = new AtomicBoolean(false);
52-
private List<DependentResourceController> dependents;
5345
private final EventSourceManager<R> eventSourceManager;
46+
private final DependentResourceManager<R> dependents;
47+
5448
private ConfigurationService configurationService;
5549

5650
public Controller(Reconciler<R> reconciler,
@@ -61,58 +55,12 @@ public Controller(Reconciler<R> reconciler,
6155
this.kubernetesClient = kubernetesClient;
6256

6357
eventSourceManager = new EventSourceManager<>(this);
64-
final var context = new EventSourceContext<>(
65-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
66-
configurationService(), kubernetesClient);
67-
68-
if (reconciler instanceof EventSourceContextInjector) {
69-
EventSourceContextInjector injector = (EventSourceContextInjector) reconciler;
70-
injector.injectInto(context);
71-
}
72-
73-
prepareEventSources(context).forEach(eventSourceManager::registerEventSource);
74-
}
75-
76-
private void waitUntilStartedAndInitContext(R resource, Context context) {
77-
if (!started.get()) {
78-
AtomicInteger count = new AtomicInteger(0);
79-
final var waitTime = 50;
80-
while (!started.get()) {
81-
try {
82-
count.getAndIncrement();
83-
Thread.sleep(waitTime);
84-
} catch (InterruptedException e) {
85-
Thread.currentThread().interrupt();
86-
}
87-
}
88-
log.info("Waited {}ms for controller '{}' to finish initializing", count.get() * waitTime,
89-
configuration.getName());
90-
}
91-
92-
if (reconciler instanceof ContextInitializer) {
93-
final var initializer = (ContextInitializer<R>) reconciler;
94-
initializer.initContext(resource, context);
95-
}
58+
dependents = new DependentResourceManager<>(this);
9659
}
9760

9861
@Override
9962
public DeleteControl cleanup(R resource, Context context) {
100-
waitUntilStartedAndInitContext(resource, context);
101-
102-
dependents.stream()
103-
.filter(DependentResourceController::deletable)
104-
.forEach(dependent -> {
105-
var dependentResource = dependent.getFor(resource, context);
106-
if (dependentResource != null) {
107-
dependent.delete(dependentResource, resource, context);
108-
logOperationInfo(resource, dependent, dependentResource, "Deleting");
109-
} else {
110-
log.info("Ignoring already deleted {} for '{}' {}",
111-
dependent.getResourceType().getName(),
112-
resource.getMetadata().getName(),
113-
configuration.getResourceTypeName());
114-
}
115-
});
63+
dependents.cleanup(resource, context);
11664

11765
return metrics().timeControllerExecution(
11866
new ControllerExecution<>() {
@@ -140,23 +88,7 @@ public DeleteControl execute() {
14088

14189
@Override
14290
public UpdateControl<R> reconcile(R resource, Context context) {
143-
waitUntilStartedAndInitContext(resource, context);
144-
145-
dependents.stream()
146-
.filter(dependent -> dependent.creatable() || dependent.updatable())
147-
.forEach(dependent -> {
148-
var dependentResource = dependent.getFor(resource, context);
149-
if (dependent.creatable() && dependentResource == null) {
150-
// we need to create the dependent
151-
dependentResource = dependent.buildFor(resource, context);
152-
createOrReplaceDependent(resource, context, dependent, dependentResource, "Creating");
153-
} else if (dependent.updatable()) {
154-
dependentResource = dependent.update(dependentResource, resource, context);
155-
createOrReplaceDependent(resource, context, dependent, dependentResource, "Updating");
156-
} else {
157-
logOperationInfo(resource, dependent, dependentResource, "Ignoring");
158-
}
159-
});
91+
dependents.reconcile(resource, context);
16092

16193
return metrics().timeControllerExecution(
16294
new ControllerExecution<>() {
@@ -189,30 +121,6 @@ public UpdateControl<R> execute() {
189121
});
190122
}
191123

192-
// todo: rename variables more explicitly
193-
private void createOrReplaceDependent(R resource,
194-
Context context, DependentResourceController dependent,
195-
Object dependentResource, String operationDescription) {
196-
// add owner reference if needed
197-
if (dependentResource instanceof HasMetadata
198-
&& ((KubernetesDependentResourceController) dependent).owned()) {
199-
((HasMetadata) dependentResource).addOwnerReference(resource);
200-
}
201-
202-
logOperationInfo(resource, dependent, dependentResource, operationDescription);
203-
204-
// commit the changes
205-
// todo: add metrics timing for dependent resource
206-
dependent.createOrReplace(dependentResource, context);
207-
}
208-
209-
private void logOperationInfo(R resource, DependentResourceController dependent,
210-
Object dependentResource, String operationDescription) {
211-
log.info("{} {} for '{}' {}", operationDescription, dependent.descriptionFor(dependentResource),
212-
resource.getMetadata().getName(),
213-
configuration.getResourceTypeName());
214-
}
215-
216124

217125
private Metrics metrics() {
218126
final var metrics = configurationService().getMetrics();
@@ -221,14 +129,8 @@ private Metrics metrics() {
221129

222130
@Override
223131
public List<EventSource> prepareEventSources(EventSourceContext<R> context) {
224-
final List<DependentResource> configured = configuration.getDependentResources();
225-
dependents = new ArrayList<>(configured.size());
226-
227-
List<EventSource> sources = new ArrayList<>(configured.size() + 5);
228-
configured.forEach(dependent -> {
229-
dependents.add(configuration.dependentFactory().from(dependent));
230-
sources.add(dependent.initEventSource(context));
231-
});
132+
final var dependentSources = dependents.prepareEventSources(context);
133+
List<EventSource> sources = new LinkedList<>(dependentSources);
232134

233135
// add manually defined event sources
234136
if (reconciler instanceof EventSourceInitializer) {
@@ -312,8 +214,17 @@ public void start() throws OperatorException {
312214
+ controllerName
313215
+ "' is configured to watch the current namespace but it couldn't be inferred from the current configuration.");
314216
}
217+
218+
final var context = new EventSourceContext<>(
219+
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
220+
configurationService(), kubernetesClient);
221+
222+
dependents.injectInto(context);
223+
prepareEventSources(context).forEach(eventSourceManager::registerEventSource);
224+
225+
dependents.start();
315226
eventSourceManager.start();
316-
started.set(true);
227+
317228
log.info("'{}' controller started, pending event sources initialization", controllerName);
318229
} catch (MissingCRDException e) {
319230
throwMissingCRDException(crdName, specVersion, controllerName);
@@ -374,6 +285,6 @@ public void stop() {
374285
if (eventSourceManager != null) {
375286
eventSourceManager.stop();
376287
}
377-
started.set(false);
288+
dependents.stop();
378289
}
379290
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package io.javaoperatorsdk.operator.processing.dependent;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import io.fabric8.kubernetes.api.model.HasMetadata;
12+
import io.javaoperatorsdk.operator.OperatorException;
13+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
14+
import io.javaoperatorsdk.operator.api.config.DependentResource;
15+
import io.javaoperatorsdk.operator.api.reconciler.Context;
16+
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
17+
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
18+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
19+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContextInjector;
20+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
21+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
22+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
23+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceController;
24+
import io.javaoperatorsdk.operator.api.reconciler.dependent.KubernetesDependentResourceController;
25+
import io.javaoperatorsdk.operator.processing.Controller;
26+
import io.javaoperatorsdk.operator.processing.LifecycleAware;
27+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
28+
29+
@SuppressWarnings({"rawtypes", "unchecked"})
30+
public class DependentResourceManager<R extends HasMetadata> implements EventSourceInitializer<R>,
31+
EventSourceContextInjector, Reconciler<R>, LifecycleAware {
32+
33+
private static final Logger log = LoggerFactory.getLogger(DependentResourceManager.class);
34+
35+
private final Reconciler<R> reconciler;
36+
private final ControllerConfiguration<R> configuration;
37+
private List<DependentResourceController> dependents;
38+
private final AtomicBoolean started = new AtomicBoolean(false);
39+
40+
41+
public DependentResourceManager(Controller<R> controller) {
42+
this.reconciler = controller.getReconciler();
43+
this.configuration = controller.getConfiguration();
44+
}
45+
46+
@Override
47+
public List<EventSource> prepareEventSources(EventSourceContext<R> context) {
48+
final List<DependentResource> configured = configuration.getDependentResources();
49+
dependents = new ArrayList<>(configured.size());
50+
51+
List<EventSource> sources = new ArrayList<>(configured.size() + 5);
52+
configured.forEach(dependent -> {
53+
dependents.add(configuration.dependentFactory().from(dependent));
54+
sources.add(dependent.initEventSource(context));
55+
});
56+
57+
return sources;
58+
}
59+
60+
@Override
61+
public void injectInto(EventSourceContext context) {
62+
if (reconciler instanceof EventSourceContextInjector) {
63+
EventSourceContextInjector injector = (EventSourceContextInjector) reconciler;
64+
injector.injectInto(context);
65+
}
66+
}
67+
68+
@Override
69+
public UpdateControl<R> reconcile(R resource, Context context) {
70+
waitUntilStartedAndInitContext(resource, context);
71+
72+
dependents.stream()
73+
.filter(dependent -> dependent.creatable() || dependent.updatable())
74+
.forEach(dependent -> {
75+
var dependentResource = dependent.getFor(resource, context);
76+
if (dependent.creatable() && dependentResource == null) {
77+
// we need to create the dependent
78+
dependentResource = dependent.buildFor(resource, context);
79+
createOrReplaceDependent(resource, context, dependent, dependentResource, "Creating");
80+
} else if (dependent.updatable()) {
81+
dependentResource = dependent.update(dependentResource, resource, context);
82+
createOrReplaceDependent(resource, context, dependent, dependentResource, "Updating");
83+
} else {
84+
logOperationInfo(resource, dependent, dependentResource, "Ignoring");
85+
}
86+
});
87+
88+
return null;
89+
}
90+
91+
@Override
92+
public DeleteControl cleanup(R resource, Context context) {
93+
waitUntilStartedAndInitContext(resource, context);
94+
95+
dependents.stream()
96+
.filter(DependentResourceController::deletable)
97+
.forEach(dependent -> {
98+
var dependentResource = dependent.getFor(resource, context);
99+
if (dependentResource != null) {
100+
dependent.delete(dependentResource, resource, context);
101+
logOperationInfo(resource, dependent, dependentResource, "Deleting");
102+
} else {
103+
log.info("Ignoring already deleted {} for '{}' {}",
104+
dependent.getResourceType().getName(),
105+
resource.getMetadata().getName(),
106+
configuration.getResourceTypeName());
107+
}
108+
});
109+
110+
return Reconciler.super.cleanup(resource, context);
111+
}
112+
113+
private void createOrReplaceDependent(R primaryResource,
114+
Context context, DependentResourceController dependentController,
115+
Object dependentResource, String operationDescription) {
116+
// add owner reference if needed
117+
if (dependentResource instanceof HasMetadata
118+
&& ((KubernetesDependentResourceController) dependentController).owned()) {
119+
((HasMetadata) dependentResource).addOwnerReference(primaryResource);
120+
}
121+
122+
logOperationInfo(primaryResource, dependentController, dependentResource, operationDescription);
123+
124+
// commit the changes
125+
// todo: add metrics timing for dependent resource
126+
dependentController.createOrReplace(dependentResource, context);
127+
}
128+
129+
private void logOperationInfo(R resource, DependentResourceController dependent,
130+
Object dependentResource, String operationDescription) {
131+
if (log.isInfoEnabled()) {
132+
log.info("{} {} for '{}' {}", operationDescription,
133+
dependent.descriptionFor(dependentResource),
134+
resource.getMetadata().getName(),
135+
configuration.getResourceTypeName());
136+
}
137+
}
138+
139+
private void waitUntilStartedAndInitContext(R resource, Context context) {
140+
if (!started.get()) {
141+
AtomicInteger count = new AtomicInteger(0);
142+
final var waitTime = 50;
143+
while (!started.get()) {
144+
try {
145+
count.getAndIncrement();
146+
Thread.sleep(waitTime);
147+
} catch (InterruptedException e) {
148+
Thread.currentThread().interrupt();
149+
}
150+
}
151+
log.info("Waited {}ms for controller '{}' to finish initializing", count.get() * waitTime,
152+
configuration.getName());
153+
}
154+
155+
if (reconciler instanceof ContextInitializer) {
156+
final var initializer = (ContextInitializer<R>) reconciler;
157+
initializer.initContext(resource, context);
158+
}
159+
}
160+
161+
@Override
162+
public void start() throws OperatorException {
163+
started.set(true);
164+
}
165+
166+
@Override
167+
public void stop() throws OperatorException {
168+
started.set(false);
169+
}
170+
}

0 commit comments

Comments
 (0)