) eventSource);
+ switch (action) {
+ case ADDED:
+ lifecycleAwareES.onResourceCreated(resource);
+ break;
+ case UPDATED:
+ lifecycleAwareES.onResourceUpdated(resource, oldResource);
+ break;
+ case DELETED:
+ lifecycleAwareES.onResourceDeleted(resource);
+ break;
+ }
}
- } finally {
- lock.unlock();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java
index 515be245b8..38c9055ff1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java
@@ -1,11 +1,12 @@
package io.javaoperatorsdk.operator.processing.event;
+import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import io.fabric8.kubernetes.api.model.HasMetadata;
-public class ResourceID {
+public class ResourceID implements Serializable {
public static ResourceID fromResource(HasMetadata resource) {
return new ResourceID(resource.getMetadata().getName(),
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
index 555da0d1b1..ddc787ad2d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
@@ -10,4 +10,5 @@ public abstract class AbstractEventSource implements EventSource {
public void setEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
}
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java
new file mode 100644
index 0000000000..9a2be41a70
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java
@@ -0,0 +1,72 @@
+package io.javaoperatorsdk.operator.processing.event.source;
+
+import java.util.Optional;
+
+import javax.cache.Cache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+/**
+ * Base class for event sources with caching capabilities.
+ *
+ * {@link #handleDelete(ResourceID)} - if the related resource is present in the cache it is removed
+ * and event propagated. There is no event propagated if the resource is not in the cache.
+ *
+ * {@link #handleEvent(Object, ResourceID)} - caches the resource if changed or missing. Propagates
+ * an event if the resource is new or not equals to the one in the cache, and if accepted by the
+ * filter if one is present.
+ *
+ * @param represents the type of resources (usually external non-kubernetes ones) being handled.
+ */
+public abstract class CachingEventSource extends LifecycleAwareEventSource {
+
+ private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class);
+
+ protected Cache cache;
+
+ public CachingEventSource(Cache cache) {
+ this.cache = cache;
+ }
+
+ protected void handleDelete(ResourceID relatedResourceID) {
+ if (!isRunning()) {
+ return;
+ }
+ var cachedValue = cache.get(relatedResourceID);
+ cache.remove(relatedResourceID);
+ // we only propagate event if the resource was previously in cache
+ if (cachedValue != null) {
+ eventHandler.handleEvent(new Event(relatedResourceID));
+ }
+ }
+
+ protected void handleEvent(T value, ResourceID relatedResourceID) {
+ if (!isRunning()) {
+ return;
+ }
+ var cachedValue = cache.get(relatedResourceID);
+ if (cachedValue == null || !cachedValue.equals(value)) {
+ cache.put(relatedResourceID, value);
+ eventHandler.handleEvent(new Event(relatedResourceID));
+ }
+ }
+
+ public Cache getCache() {
+ return cache;
+ }
+
+ public Optional getCachedValue(ResourceID resourceID) {
+ return Optional.ofNullable(cache.get(resourceID));
+ }
+
+ @Override
+ public void stop() throws OperatorException {
+ super.stop();
+ cache.close();
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
index e0d0bc45e1..18e47d03db 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
@@ -2,16 +2,9 @@
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
-import io.javaoperatorsdk.operator.processing.event.ResourceID;
public interface EventSource extends LifecycleAware {
void setEventHandler(EventHandler eventHandler);
- /**
- * Automatically called when a custom resource is deleted from the cluster.
- *
- * @param customResourceUid - id of custom resource
- */
- default void cleanupForResource(ResourceID customResourceUid) {}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceRegistry.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceRegistry.java
index a2d38690cc..dca5436427 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceRegistry.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceRegistry.java
@@ -4,6 +4,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
public interface EventSourceRegistry {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/LifecycleAwareEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/LifecycleAwareEventSource.java
new file mode 100644
index 0000000000..6b2d79fd1a
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/LifecycleAwareEventSource.java
@@ -0,0 +1,22 @@
+package io.javaoperatorsdk.operator.processing.event.source;
+
+import io.javaoperatorsdk.operator.OperatorException;
+
+public abstract class LifecycleAwareEventSource extends AbstractEventSource {
+
+ private volatile boolean running = false;
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void start() throws OperatorException {
+ running = true;
+ }
+
+ @Override
+ public void stop() throws OperatorException {
+ running = false;
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java
deleted file mode 100644
index ff0b4673be..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package io.javaoperatorsdk.operator.processing.event.source;
-
-public enum ResourceAction {
- ADDED, UPDATED, DELETED
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventAware.java
new file mode 100644
index 0000000000..dcb15a4229
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventAware.java
@@ -0,0 +1,13 @@
+package io.javaoperatorsdk.operator.processing.event.source;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+
+public interface ResourceEventAware {
+
+ default void onResourceCreated(T resource) {}
+
+ default void onResourceUpdated(T newResource, T oldResource) {}
+
+ default void onResourceDeleted(T resource) {}
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceCache.java
similarity index 94%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceCache.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceCache.java
index 2186277e32..2397af9573 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceCache.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import java.util.Map;
import java.util.Optional;
@@ -11,7 +11,7 @@
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
+import static io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
public class ControllerResourceCache implements ResourceCache {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
similarity index 83%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceEventSource.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
index faf0d8f33b..2386cd4dbc 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import java.util.Collections;
import java.util.Map;
@@ -20,6 +20,7 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
@@ -75,30 +76,20 @@ public void start() {
try {
if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) {
- final var filteredBySelectorClient = client.inAnyNamespace()
- .withLabelSelector(labelSelector);
final var informer =
- createAndRunInformerFor(filteredBySelectorClient, ANY_NAMESPACE_MAP_KEY);
+ createAndRunInformerFor(client.inAnyNamespace()
+ .withLabelSelector(labelSelector), ANY_NAMESPACE_MAP_KEY);
log.debug("Registered {} -> {} for any namespace", controller, informer);
} else {
- targetNamespaces.forEach(
- ns -> {
- final var informer = createAndRunInformerFor(
- client.inNamespace(ns).withLabelSelector(labelSelector), ns);
- log.debug("Registered {} -> {} for namespace: {}", controller, informer,
- ns);
- });
+ targetNamespaces.forEach(ns -> {
+ final var informer = createAndRunInformerFor(
+ client.inNamespace(ns).withLabelSelector(labelSelector), ns);
+ log.debug("Registered {} -> {} for namespace: {}", controller, informer, ns);
+ });
}
} catch (Exception e) {
if (e instanceof KubernetesClientException) {
- KubernetesClientException ke = (KubernetesClientException) e;
- if (404 == ke.getCode()) {
- // only throw MissingCRDException if the 404 error occurs on the target CRD
- final var targetCRDName = controller.getConfiguration().getResourceTypeName();
- if (targetCRDName.equals(ke.getFullResourceName())) {
- throw new MissingCRDException(targetCRDName, null, e.getMessage(), e);
- }
- }
+ handleKubernetesClientException(e);
}
throw e;
}
@@ -130,6 +121,8 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
log.debug(
"Event received for resource: {}", getName(customResource));
MDCUtils.addResourceInfo(customResource);
+ controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource,
+ oldResource);
if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) {
eventHandler.handleEvent(
new ResourceEvent(action, ResourceID.fromResource(customResource)));
@@ -191,4 +184,16 @@ public void whitelistNextEvent(ResourceID resourceID) {
}
}
+
+ private void handleKubernetesClientException(Exception e) {
+ KubernetesClientException ke = (KubernetesClientException) e;
+ if (404 == ke.getCode()) {
+ // only throw MissingCRDException if the 404 error occurs on the target CRD
+ final var targetCRDName = controller.getConfiguration().getResourceTypeName();
+ if (targetCRDName.equals(ke.getFullResourceName())) {
+ throw new MissingCRDException(targetCRDName, null, e.getMessage(), e);
+ }
+ }
+ }
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java
similarity index 94%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilter.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java
index aba822dcf6..8262ff1c21 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilter.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java
new file mode 100644
index 0000000000..7a04dc9164
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java
@@ -0,0 +1,5 @@
+package io.javaoperatorsdk.operator.processing.event.source.controller;
+
+public enum ResourceAction {
+ ADDED, UPDATED, DELETED
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceCache.java
similarity index 89%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceCache.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceCache.java
index af156e24ec..5508dd43eb 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceCache.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import java.util.Optional;
import java.util.function.Predicate;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java
similarity index 88%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEvent.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java
index 7610a880b7..ad1d85330c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEvent.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java
similarity index 97%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilter.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java
index 01f7e1aec0..497c9016b7 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilter.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilters.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java
similarity index 98%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilters.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java
index 9d90ebd963..43fe410fbc 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilters.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.controller;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java
new file mode 100644
index 0000000000..51d1ed287d
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java
@@ -0,0 +1,21 @@
+package io.javaoperatorsdk.operator.processing.event.source.inbound;
+
+import javax.cache.Cache;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
+
+public class CachingInboundEventSource extends CachingEventSource {
+
+ public CachingInboundEventSource(Cache cache) {
+ super(cache);
+ }
+
+ public void handleResourceEvent(T resource, ResourceID relatedResourceID) {
+ super.handleEvent(resource, relatedResourceID);
+ }
+
+ public void handleResourceDeleteEvent(ResourceID resourceID) {
+ super.handleDelete(resourceID);
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java
new file mode 100644
index 0000000000..475cfee916
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java
@@ -0,0 +1,22 @@
+package io.javaoperatorsdk.operator.processing.event.source.inbound;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.LifecycleAwareEventSource;
+
+public class SimpleInboundEventSource extends LifecycleAwareEventSource {
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleInboundEventSource.class);
+
+ public void propagateEvent(ResourceID resourceID) {
+ if (isRunning()) {
+ eventHandler.handleEvent(new Event(resourceID));
+ } else {
+ log.debug("Event source not started yet, not propagating event for: {}", resourceID);
+ }
+ }
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
similarity index 96%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/InformerEventSource.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
index 3bdc8fb764..8095289221 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/InformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.informer;
import java.util.Objects;
import java.util.Set;
@@ -15,6 +15,7 @@
import io.fabric8.kubernetes.client.informers.cache.Store;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource;
public class InformerEventSource extends AbstractEventSource {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Mappers.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java
similarity index 95%
rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Mappers.java
rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java
index d428bb26cc..c578490147 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Mappers.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java
@@ -1,4 +1,4 @@
-package io.javaoperatorsdk.operator.processing.event.source;
+package io.javaoperatorsdk.operator.processing.event.source.informer;
import java.util.Collections;
import java.util.Set;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
new file mode 100644
index 0000000000..bf9b41cf0e
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
@@ -0,0 +1,146 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+
+import javax.cache.Cache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
+
+/**
+ *
+ * Polls the supplier for each controlled resource registered. Resource is registered when created
+ * if there is no registerPredicate provided. If register predicate provided it is evaluated on
+ * resource create and/or update to register polling for the event source.
+ *
+ * For other behavior see {@link CachingEventSource}
+ *
+ * @param the resource polled by the event source
+ * @param related custom resource
+ */
+public class PerResourcePollingEventSource
+ extends CachingEventSource
+ implements ResourceEventAware {
+
+ private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);
+
+ private final Timer timer = new Timer();
+ private final Map timerTasks = new ConcurrentHashMap<>();
+ private final ResourceSupplier resourceSupplier;
+ private final ResourceCache resourceCache;
+ private final Predicate registerPredicate;
+ private final long period;
+
+ public PerResourcePollingEventSource(ResourceSupplier resourceSupplier,
+ ResourceCache resourceCache, long period, Cache cache) {
+ this(resourceSupplier, resourceCache, period, cache, null);
+ }
+
+ public PerResourcePollingEventSource(ResourceSupplier resourceSupplier,
+ ResourceCache resourceCache, long period, Cache cache,
+ Predicate registerPredicate) {
+ super(cache);
+ this.resourceSupplier = resourceSupplier;
+ this.resourceCache = resourceCache;
+ this.period = period;
+ this.registerPredicate = registerPredicate;
+ }
+
+ private void pollForResource(R resource) {
+ var value = resourceSupplier.getResources(resource);
+ var resourceID = ResourceID.fromResource(resource);
+ if (value.isEmpty()) {
+ super.handleDelete(resourceID);
+ } else {
+ super.handleEvent(value.get(), resourceID);
+ }
+ }
+
+ private Optional getAndCacheResource(ResourceID resourceID) {
+ var resource = resourceCache.get(resourceID);
+ if (resource.isPresent()) {
+ var value = resourceSupplier.getResources(resource.get());
+ value.ifPresent(v -> cache.put(resourceID, v));
+ return value;
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void onResourceCreated(R resource) {
+ checkAndRegisterTask(resource);
+ }
+
+ @Override
+ public void onResourceUpdated(R newResource, R oldResource) {
+ checkAndRegisterTask(newResource);
+ }
+
+ @Override
+ public void onResourceDeleted(R resource) {
+ var resourceID = ResourceID.fromResource(resource);
+ TimerTask task = timerTasks.remove(resourceID);
+ if (task != null) {
+ task.cancel();
+ }
+ cache.remove(resourceID);
+ }
+
+ private void checkAndRegisterTask(R resource) {
+ var resourceID = ResourceID.fromResource(resource);
+ if (timerTasks.get(resourceID) == null && (registerPredicate == null
+ || registerPredicate.test(resource))) {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (!isRunning()) {
+ log.debug("Event source not yet started. Will not run for: {}", resourceID);
+ return;
+ }
+ // always use up-to-date resource from cache
+ var res = resourceCache.get(resourceID);
+ res.ifPresentOrElse(r -> pollForResource(r),
+ () -> log.warn("No resource in cache for resource ID: {}", resourceID));
+ }
+ }, 0, period);
+ }
+ }
+
+ /**
+ *
+ * @param resourceID of the target related resource
+ * @return the cached value of the resource, if not present it gets the resource from the
+ * supplier. The value provided from the supplier is cached, but no new event is
+ * propagated.
+ */
+ public Optional getValueFromCacheOrSupplier(ResourceID resourceID) {
+ var cachedValue = getCachedValue(resourceID);
+ if (cachedValue.isPresent()) {
+ return cachedValue;
+ } else {
+ return getAndCacheResource(resourceID);
+ }
+ }
+
+ public interface ResourceSupplier {
+ Optional getResources(R resource);
+ }
+
+ @Override
+ public void stop() throws OperatorException {
+ super.stop();
+ timer.cancel();
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
new file mode 100644
index 0000000000..b2c3fdff78
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
@@ -0,0 +1,68 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.StreamSupport;
+
+import javax.cache.Cache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
+
+public class PollingEventSource extends CachingEventSource {
+
+ private static final Logger log = LoggerFactory.getLogger(PollingEventSource.class);
+
+ private final Timer timer = new Timer();
+ private final Supplier