Skip to content

Commit 2e9bd22

Browse files
authored
feat: Event Notification and Polling, Inbound Event Sources (#720)
1 parent cc335df commit 2e9bd22

File tree

53 files changed

+1221
-214
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1221
-214
lines changed

operator-framework-core/pom.xml

+15-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@
9595
<artifactId>assertj-core</artifactId>
9696
<scope>test</scope>
9797
</dependency>
98-
9998
<dependency>
10099
<groupId>io.fabric8</groupId>
101100
<artifactId>kubernetes-server-mock</artifactId>
@@ -106,5 +105,20 @@
106105
<artifactId>awaitility</artifactId>
107106
<scope>test</scope>
108107
</dependency>
108+
<dependency>
109+
<groupId>javax.cache</groupId>
110+
<artifactId>cache-api</artifactId>
111+
<version>${jcache.version}</version>
112+
</dependency>
113+
<dependency>
114+
<groupId>com.github.ben-manes.caffeine</groupId>
115+
<artifactId>caffeine</artifactId>
116+
<scope>test</scope>
117+
</dependency>
118+
<dependency>
119+
<groupId>com.github.ben-manes.caffeine</groupId>
120+
<artifactId>jcache</artifactId>
121+
<scope>test</scope>
122+
</dependency>
109123
</dependencies>
110124
</project>

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import io.fabric8.kubernetes.api.model.HasMetadata;
88
import io.fabric8.kubernetes.client.CustomResource;
99
import io.javaoperatorsdk.operator.ReconcilerUtils;
10-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
11-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilters;
10+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
11+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
1212

1313
public interface ControllerConfiguration<R extends HasMetadata> {
1414

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.Set;
66

77
import io.fabric8.kubernetes.api.model.HasMetadata;
8-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
8+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
99

1010
public class ControllerConfigurationOverrider<R extends HasMetadata> {
1111

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.Set;
55

66
import io.fabric8.kubernetes.api.model.HasMetadata;
7-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
7+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
88

99
public class DefaultControllerConfiguration<R extends HasMetadata>
1010
implements ControllerConfiguration<R> {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.lang.annotation.RetentionPolicy;
66
import java.lang.annotation.Target;
77

8-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
8+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
99

1010
@Retention(RetentionPolicy.RUNTIME)
1111
@Target({ElementType.TYPE})

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+4
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ private boolean failOnMissingCurrentNS() {
212212
return false;
213213
}
214214

215+
public EventSourceManager<R> getEventSourceManager() {
216+
return eventSourceManager;
217+
}
218+
215219
public void stop() {
216220
if (eventSourceManager != null) {
217221
eventSourceManager.stop();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/Event.java

+17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.util.Objects;
4+
35
public class Event {
46

57
private final ResourceID relatedCustomResource;
@@ -18,4 +20,19 @@ public String toString() {
1820
"relatedCustomResource=" + relatedCustomResource +
1921
'}';
2022
}
23+
24+
@Override
25+
public boolean equals(Object o) {
26+
if (this == o)
27+
return true;
28+
if (o == null || getClass() != o.getClass())
29+
return false;
30+
Event event = (Event) o;
31+
return Objects.equals(relatedCustomResource, event.relatedCustomResource);
32+
}
33+
34+
@Override
35+
public int hashCode() {
36+
return Objects.hash(relatedCustomResource);
37+
}
2138
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
2121
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2222
import io.javaoperatorsdk.operator.processing.MDCUtils;
23-
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
24-
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
25-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
26-
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
23+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
24+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
25+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
26+
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
2727
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
2828
import io.javaoperatorsdk.operator.processing.retry.Retry;
2929
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
@@ -297,7 +297,6 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
297297
}
298298

299299
private void cleanupForDeletedEvent(ResourceID customResourceUid) {
300-
eventSourceManager.cleanupForCustomResource(customResourceUid);
301300
eventMarker.cleanup(customResourceUid);
302301
metrics.cleanupDoneFor(customResourceUid);
303302
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import io.javaoperatorsdk.operator.OperatorException;
1212
import io.javaoperatorsdk.operator.processing.Controller;
1313
import io.javaoperatorsdk.operator.processing.LifecycleAware;
14-
import io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource;
1514
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1615
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
17-
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
16+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
17+
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
18+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
19+
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
1820

1921
public class EventSourceManager<R extends HasMetadata>
2022
implements EventSourceRegistry<R>, LifecycleAware {
@@ -107,14 +109,22 @@ public final void registerEventSource(EventSource eventSource)
107109
}
108110
}
109111

110-
public void cleanupForCustomResource(ResourceID customResourceUid) {
111-
lock.lock();
112-
try {
113-
for (EventSource eventSource : this.eventSources) {
114-
eventSource.cleanupForResource(customResourceUid);
112+
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
113+
for (EventSource eventSource : this.eventSources) {
114+
if (eventSource instanceof ResourceEventAware) {
115+
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
116+
switch (action) {
117+
case ADDED:
118+
lifecycleAwareES.onResourceCreated(resource);
119+
break;
120+
case UPDATED:
121+
lifecycleAwareES.onResourceUpdated(resource, oldResource);
122+
break;
123+
case DELETED:
124+
lifecycleAwareES.onResourceDeleted(resource);
125+
break;
126+
}
115127
}
116-
} finally {
117-
lock.unlock();
118128
}
119129
}
120130

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.io.Serializable;
34
import java.util.Objects;
45
import java.util.Optional;
56

67
import io.fabric8.kubernetes.api.model.HasMetadata;
78

8-
public class ResourceID {
9+
public class ResourceID implements Serializable {
910

1011
public static ResourceID fromResource(HasMetadata resource) {
1112
return new ResourceID(resource.getMetadata().getName(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java

+1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ public abstract class AbstractEventSource implements EventSource {
1010
public void setEventHandler(EventHandler eventHandler) {
1111
this.eventHandler = eventHandler;
1212
}
13+
1314
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import java.util.Optional;
4+
5+
import javax.cache.Cache;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import io.javaoperatorsdk.operator.OperatorException;
11+
import io.javaoperatorsdk.operator.processing.event.Event;
12+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
13+
14+
/**
15+
* Base class for event sources with caching capabilities.
16+
* <p>
17+
* {@link #handleDelete(ResourceID)} - if the related resource is present in the cache it is removed
18+
* and event propagated. There is no event propagated if the resource is not in the cache.
19+
* <p>
20+
* {@link #handleEvent(Object, ResourceID)} - caches the resource if changed or missing. Propagates
21+
* an event if the resource is new or not equals to the one in the cache, and if accepted by the
22+
* filter if one is present.
23+
*
24+
* @param <T> represents the type of resources (usually external non-kubernetes ones) being handled.
25+
*/
26+
public abstract class CachingEventSource<T> extends LifecycleAwareEventSource {
27+
28+
private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class);
29+
30+
protected Cache<ResourceID, T> cache;
31+
32+
public CachingEventSource(Cache<ResourceID, T> cache) {
33+
this.cache = cache;
34+
}
35+
36+
protected void handleDelete(ResourceID relatedResourceID) {
37+
if (!isRunning()) {
38+
return;
39+
}
40+
var cachedValue = cache.get(relatedResourceID);
41+
cache.remove(relatedResourceID);
42+
// we only propagate event if the resource was previously in cache
43+
if (cachedValue != null) {
44+
eventHandler.handleEvent(new Event(relatedResourceID));
45+
}
46+
}
47+
48+
protected void handleEvent(T value, ResourceID relatedResourceID) {
49+
if (!isRunning()) {
50+
return;
51+
}
52+
var cachedValue = cache.get(relatedResourceID);
53+
if (cachedValue == null || !cachedValue.equals(value)) {
54+
cache.put(relatedResourceID, value);
55+
eventHandler.handleEvent(new Event(relatedResourceID));
56+
}
57+
}
58+
59+
public Cache<ResourceID, T> getCache() {
60+
return cache;
61+
}
62+
63+
public Optional<T> getCachedValue(ResourceID resourceID) {
64+
return Optional.ofNullable(cache.get(resourceID));
65+
}
66+
67+
@Override
68+
public void stop() throws OperatorException {
69+
super.stop();
70+
cache.close();
71+
}
72+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java

-7
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,9 @@
22

33
import io.javaoperatorsdk.operator.processing.LifecycleAware;
44
import io.javaoperatorsdk.operator.processing.event.EventHandler;
5-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
65

76
public interface EventSource extends LifecycleAware {
87

98
void setEventHandler(EventHandler eventHandler);
109

11-
/**
12-
* Automatically called when a custom resource is deleted from the cluster.
13-
*
14-
* @param customResourceUid - id of custom resource
15-
*/
16-
default void cleanupForResource(ResourceID customResourceUid) {}
1710
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceRegistry.java

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
66
import io.javaoperatorsdk.operator.OperatorException;
7+
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
78

89
public interface EventSourceRegistry<T extends HasMetadata> {
910

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import io.javaoperatorsdk.operator.OperatorException;
4+
5+
public abstract class LifecycleAwareEventSource extends AbstractEventSource {
6+
7+
private volatile boolean running = false;
8+
9+
public boolean isRunning() {
10+
return running;
11+
}
12+
13+
@Override
14+
public void start() throws OperatorException {
15+
running = true;
16+
}
17+
18+
@Override
19+
public void stop() throws OperatorException {
20+
running = false;
21+
}
22+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java

-5
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
5+
public interface ResourceEventAware<T extends HasMetadata> {
6+
7+
default void onResourceCreated(T resource) {}
8+
9+
default void onResourceUpdated(T newResource, T oldResource) {}
10+
11+
default void onResourceDeleted(T resource) {}
12+
13+
}
+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.javaoperatorsdk.operator.processing.event.source;
1+
package io.javaoperatorsdk.operator.processing.event.source.controller;
22

33
import java.util.Map;
44
import java.util.Optional;
@@ -11,7 +11,7 @@
1111
import io.javaoperatorsdk.operator.api.config.Cloner;
1212
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1313

14-
import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
14+
import static io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
1515

1616
public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {
1717

0 commit comments

Comments
 (0)