1
1
package io .javaoperatorsdk .operator .monitoring .micrometer ;
2
2
3
- import java .util .ArrayList ;
4
- import java .util .Collections ;
5
- import java .util .List ;
6
- import java .util .Map ;
7
- import java .util .Optional ;
3
+ import java .util .*;
8
4
import java .util .concurrent .ConcurrentHashMap ;
5
+ import java .util .concurrent .Executors ;
6
+ import java .util .concurrent .ScheduledExecutorService ;
7
+ import java .util .concurrent .TimeUnit ;
9
8
import java .util .concurrent .atomic .AtomicInteger ;
10
9
11
10
import io .fabric8 .kubernetes .api .model .HasMetadata ;
17
16
import io .javaoperatorsdk .operator .processing .GroupVersionKind ;
18
17
import io .javaoperatorsdk .operator .processing .event .Event ;
19
18
import io .javaoperatorsdk .operator .processing .event .ResourceID ;
19
+ import io .micrometer .core .instrument .Meter ;
20
20
import io .micrometer .core .instrument .MeterRegistry ;
21
21
import io .micrometer .core .instrument .Tag ;
22
22
import io .micrometer .core .instrument .Timer ;
@@ -31,9 +31,17 @@ public class MicrometerMetrics implements Metrics {
31
31
private static final String RECONCILIATIONS_QUEUE_SIZE = PREFIX + RECONCILIATIONS + "queue.size." ;
32
32
private final MeterRegistry registry ;
33
33
private final Map <String , AtomicInteger > gauges = new ConcurrentHashMap <>();
34
+ private final Map <ResourceID , Set <Meter .Id >> metersPerResource = new ConcurrentHashMap <>();
35
+ private final ScheduledExecutorService metersCleaner = Executors .newScheduledThreadPool (10 );
36
+ private final int cleanUpDelayInSeconds ;
34
37
35
38
public MicrometerMetrics (MeterRegistry registry ) {
39
+ this (registry , 300 );
40
+ }
41
+
42
+ public MicrometerMetrics (MeterRegistry registry , int cleanUpDelayInSeconds ) {
36
43
this .registry = registry ;
44
+ this .cleanUpDelayInSeconds = cleanUpDelayInSeconds ;
37
45
}
38
46
39
47
@ Override
@@ -116,6 +124,14 @@ public void receivedEvent(Event event, Map<String, Object> metadata) {
116
124
@ Override
117
125
public void cleanupDoneFor (ResourceID resourceID , Map <String , Object > metadata ) {
118
126
incrementCounter (resourceID , "events.delete" , metadata );
127
+
128
+ // schedule deletion of meters associated with ResourceID
129
+ metersCleaner .schedule (() -> {
130
+ final var toClean = metersPerResource .get (resourceID );
131
+ if (toClean != null ) {
132
+ toClean .forEach (registry ::remove );
133
+ }
134
+ }, cleanUpDelayInSeconds , TimeUnit .SECONDS );
119
135
}
120
136
121
137
@ Override
@@ -125,11 +141,11 @@ public void reconcileCustomResource(HasMetadata resource, RetryInfo retryInfoNul
125
141
incrementCounter (ResourceID .fromResource (resource ), RECONCILIATIONS + "started" ,
126
142
metadata ,
127
143
RECONCILIATIONS + "retries.number" ,
128
- "" + retryInfo .map (RetryInfo ::getAttemptCount ).orElse (0 ),
144
+ String . valueOf ( retryInfo .map (RetryInfo ::getAttemptCount ).orElse (0 ) ),
129
145
RECONCILIATIONS + "retries.last" ,
130
- "" + retryInfo .map (RetryInfo ::isLastAttempt ).orElse (true ));
146
+ String . valueOf ( retryInfo .map (RetryInfo ::isLastAttempt ).orElse (true ) ));
131
147
132
- AtomicInteger controllerQueueSize =
148
+ var controllerQueueSize =
133
149
gauges .get (RECONCILIATIONS_QUEUE_SIZE + metadata .get (CONTROLLER_NAME ));
134
150
controllerQueueSize .incrementAndGet ();
135
151
}
@@ -141,18 +157,18 @@ public void finishedReconciliation(HasMetadata resource, Map<String, Object> met
141
157
142
158
@ Override
143
159
public void reconciliationExecutionStarted (HasMetadata resource , Map <String , Object > metadata ) {
144
- AtomicInteger reconcilerExecutions =
160
+ var reconcilerExecutions =
145
161
gauges .get (RECONCILIATIONS_EXECUTIONS + metadata .get (CONTROLLER_NAME ));
146
162
reconcilerExecutions .incrementAndGet ();
147
163
}
148
164
149
165
@ Override
150
166
public void reconciliationExecutionFinished (HasMetadata resource , Map <String , Object > metadata ) {
151
- AtomicInteger reconcilerExecutions =
167
+ var reconcilerExecutions =
152
168
gauges .get (RECONCILIATIONS_EXECUTIONS + metadata .get (CONTROLLER_NAME ));
153
169
reconcilerExecutions .decrementAndGet ();
154
170
155
- AtomicInteger controllerQueueSize =
171
+ var controllerQueueSize =
156
172
gauges .get (RECONCILIATIONS_QUEUE_SIZE + metadata .get (CONTROLLER_NAME ));
157
173
controllerQueueSize .decrementAndGet ();
158
174
}
@@ -202,6 +218,8 @@ private void incrementCounter(ResourceID id, String counterName, Map<String, Obj
202
218
"version" , gvk .version ,
203
219
"kind" , gvk .kind ));
204
220
}
205
- registry .counter (PREFIX + counterName , tags .toArray (new String [0 ])).increment ();
221
+ final var counter = registry .counter (PREFIX + counterName , tags .toArray (new String [0 ]));
222
+ metersPerResource .computeIfAbsent (id , resourceID -> new HashSet <>()).add (counter .getId ());
223
+ counter .increment ();
206
224
}
207
225
}
0 commit comments