1
1
package io .javaoperatorsdk .operator .processing ;
2
2
3
- import java .util .ArrayList ;
3
+ import java .util .LinkedList ;
4
4
import java .util .List ;
5
5
import java .util .Objects ;
6
- import java .util .concurrent .atomic .AtomicBoolean ;
7
- import java .util .concurrent .atomic .AtomicInteger ;
8
6
9
7
import org .slf4j .Logger ;
10
8
import org .slf4j .LoggerFactory ;
22
20
import io .javaoperatorsdk .operator .api .config .BaseConfigurationService ;
23
21
import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
24
22
import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
25
- import io .javaoperatorsdk .operator .api .config .DependentResource ;
26
23
import io .javaoperatorsdk .operator .api .config .Version ;
27
24
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
28
25
import io .javaoperatorsdk .operator .api .monitoring .Metrics .ControllerExecution ;
29
26
import io .javaoperatorsdk .operator .api .reconciler .Context ;
30
- import io .javaoperatorsdk .operator .api .reconciler .ContextInitializer ;
31
27
import io .javaoperatorsdk .operator .api .reconciler .DeleteControl ;
32
28
import io .javaoperatorsdk .operator .api .reconciler .EventSourceContext ;
33
- import io .javaoperatorsdk .operator .api .reconciler .EventSourceContextInjector ;
34
29
import io .javaoperatorsdk .operator .api .reconciler .EventSourceInitializer ;
35
30
import io .javaoperatorsdk .operator .api .reconciler .Reconciler ;
36
31
import io .javaoperatorsdk .operator .api .reconciler .UpdateControl ;
37
- import io .javaoperatorsdk .operator .api .reconciler .dependent .DependentResourceController ;
38
- import io .javaoperatorsdk .operator .api .reconciler .dependent .KubernetesDependentResourceController ;
32
+ import io .javaoperatorsdk .operator .processing .dependent .DependentResourceManager ;
39
33
import io .javaoperatorsdk .operator .processing .event .EventSourceManager ;
40
34
import io .javaoperatorsdk .operator .processing .event .source .EventSource ;
41
35
@@ -48,9 +42,9 @@ public class Controller<R extends HasMetadata> implements Reconciler<R>,
48
42
private final Reconciler <R > reconciler ;
49
43
private final ControllerConfiguration <R > configuration ;
50
44
private final KubernetesClient kubernetesClient ;
51
- private final AtomicBoolean started = new AtomicBoolean (false );
52
- private List <DependentResourceController > dependents ;
53
45
private final EventSourceManager <R > eventSourceManager ;
46
+ private final DependentResourceManager <R > dependents ;
47
+
54
48
private ConfigurationService configurationService ;
55
49
56
50
public Controller (Reconciler <R > reconciler ,
@@ -61,58 +55,12 @@ public Controller(Reconciler<R> reconciler,
61
55
this .kubernetesClient = kubernetesClient ;
62
56
63
57
eventSourceManager = new EventSourceManager <>(this );
64
- final var context = new EventSourceContext <>(
65
- eventSourceManager .getControllerResourceEventSource ().getResourceCache (),
66
- configurationService (), kubernetesClient );
67
-
68
- if (reconciler instanceof EventSourceContextInjector ) {
69
- EventSourceContextInjector injector = (EventSourceContextInjector ) reconciler ;
70
- injector .injectInto (context );
71
- }
72
-
73
- prepareEventSources (context ).forEach (eventSourceManager ::registerEventSource );
74
- }
75
-
76
- private void waitUntilStartedAndInitContext (R resource , Context context ) {
77
- if (!started .get ()) {
78
- AtomicInteger count = new AtomicInteger (0 );
79
- final var waitTime = 50 ;
80
- while (!started .get ()) {
81
- try {
82
- count .getAndIncrement ();
83
- Thread .sleep (waitTime );
84
- } catch (InterruptedException e ) {
85
- Thread .currentThread ().interrupt ();
86
- }
87
- }
88
- log .info ("Waited {}ms for controller '{}' to finish initializing" , count .get () * waitTime ,
89
- configuration .getName ());
90
- }
91
-
92
- if (reconciler instanceof ContextInitializer ) {
93
- final var initializer = (ContextInitializer <R >) reconciler ;
94
- initializer .initContext (resource , context );
95
- }
58
+ dependents = new DependentResourceManager <>(this );
96
59
}
97
60
98
61
@ Override
99
62
public DeleteControl cleanup (R resource , Context context ) {
100
- waitUntilStartedAndInitContext (resource , context );
101
-
102
- dependents .stream ()
103
- .filter (DependentResourceController ::deletable )
104
- .forEach (dependent -> {
105
- var dependentResource = dependent .getFor (resource , context );
106
- if (dependentResource != null ) {
107
- dependent .delete (dependentResource , resource , context );
108
- logOperationInfo (resource , dependent , dependentResource , "Deleting" );
109
- } else {
110
- log .info ("Ignoring already deleted {} for '{}' {}" ,
111
- dependent .getResourceType ().getName (),
112
- resource .getMetadata ().getName (),
113
- configuration .getResourceTypeName ());
114
- }
115
- });
63
+ dependents .cleanup (resource , context );
116
64
117
65
return metrics ().timeControllerExecution (
118
66
new ControllerExecution <>() {
@@ -140,23 +88,7 @@ public DeleteControl execute() {
140
88
141
89
@ Override
142
90
public UpdateControl <R > reconcile (R resource , Context context ) {
143
- waitUntilStartedAndInitContext (resource , context );
144
-
145
- dependents .stream ()
146
- .filter (dependent -> dependent .creatable () || dependent .updatable ())
147
- .forEach (dependent -> {
148
- var dependentResource = dependent .getFor (resource , context );
149
- if (dependent .creatable () && dependentResource == null ) {
150
- // we need to create the dependent
151
- dependentResource = dependent .buildFor (resource , context );
152
- createOrReplaceDependent (resource , context , dependent , dependentResource , "Creating" );
153
- } else if (dependent .updatable ()) {
154
- dependentResource = dependent .update (dependentResource , resource , context );
155
- createOrReplaceDependent (resource , context , dependent , dependentResource , "Updating" );
156
- } else {
157
- logOperationInfo (resource , dependent , dependentResource , "Ignoring" );
158
- }
159
- });
91
+ dependents .reconcile (resource , context );
160
92
161
93
return metrics ().timeControllerExecution (
162
94
new ControllerExecution <>() {
@@ -189,30 +121,6 @@ public UpdateControl<R> execute() {
189
121
});
190
122
}
191
123
192
- // todo: rename variables more explicitly
193
- private void createOrReplaceDependent (R resource ,
194
- Context context , DependentResourceController dependent ,
195
- Object dependentResource , String operationDescription ) {
196
- // add owner reference if needed
197
- if (dependentResource instanceof HasMetadata
198
- && ((KubernetesDependentResourceController ) dependent ).owned ()) {
199
- ((HasMetadata ) dependentResource ).addOwnerReference (resource );
200
- }
201
-
202
- logOperationInfo (resource , dependent , dependentResource , operationDescription );
203
-
204
- // commit the changes
205
- // todo: add metrics timing for dependent resource
206
- dependent .createOrReplace (dependentResource , context );
207
- }
208
-
209
- private void logOperationInfo (R resource , DependentResourceController dependent ,
210
- Object dependentResource , String operationDescription ) {
211
- log .info ("{} {} for '{}' {}" , operationDescription , dependent .descriptionFor (dependentResource ),
212
- resource .getMetadata ().getName (),
213
- configuration .getResourceTypeName ());
214
- }
215
-
216
124
217
125
private Metrics metrics () {
218
126
final var metrics = configurationService ().getMetrics ();
@@ -221,14 +129,8 @@ private Metrics metrics() {
221
129
222
130
@ Override
223
131
public List <EventSource > prepareEventSources (EventSourceContext <R > context ) {
224
- final List <DependentResource > configured = configuration .getDependentResources ();
225
- dependents = new ArrayList <>(configured .size ());
226
-
227
- List <EventSource > sources = new ArrayList <>(configured .size () + 5 );
228
- configured .forEach (dependent -> {
229
- dependents .add (configuration .dependentFactory ().from (dependent ));
230
- sources .add (dependent .initEventSource (context ));
231
- });
132
+ final var dependentSources = dependents .prepareEventSources (context );
133
+ List <EventSource > sources = new LinkedList <>(dependentSources );
232
134
233
135
// add manually defined event sources
234
136
if (reconciler instanceof EventSourceInitializer ) {
@@ -312,8 +214,17 @@ public void start() throws OperatorException {
312
214
+ controllerName
313
215
+ "' is configured to watch the current namespace but it couldn't be inferred from the current configuration." );
314
216
}
217
+
218
+ final var context = new EventSourceContext <>(
219
+ eventSourceManager .getControllerResourceEventSource ().getResourceCache (),
220
+ configurationService (), kubernetesClient );
221
+
222
+ dependents .injectInto (context );
223
+ prepareEventSources (context ).forEach (eventSourceManager ::registerEventSource );
224
+
225
+ dependents .start ();
315
226
eventSourceManager .start ();
316
- started . set ( true );
227
+
317
228
log .info ("'{}' controller started, pending event sources initialization" , controllerName );
318
229
} catch (MissingCRDException e ) {
319
230
throwMissingCRDException (crdName , specVersion , controllerName );
@@ -374,6 +285,6 @@ public void stop() {
374
285
if (eventSourceManager != null ) {
375
286
eventSourceManager .stop ();
376
287
}
377
- started . set ( false );
288
+ dependents . stop ( );
378
289
}
379
290
}
0 commit comments