Skip to content

Event Notification and Polling, Inbound Event Sources #720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
Expand All @@ -106,5 +105,20 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>jcache</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;

public interface ControllerConfiguration<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

public class ControllerConfigurationOverrider<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

public class DefaultControllerConfiguration<R extends HasMetadata>
implements ControllerConfiguration<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ private boolean failOnMissingCurrentNS() {
return false;
}

public EventSourceManager<R> getEventSourceManager() {
return eventSourceManager;
}

public void stop() {
if (eventSourceManager != null) {
eventSourceManager.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.Objects;

public class Event {

private final ResourceID relatedCustomResource;
Expand All @@ -18,4 +20,19 @@ public String toString() {
"relatedCustomResource=" + relatedCustomResource +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Event event = (Event) o;
return Objects.equals(relatedCustomResource, event.relatedCustomResource);
}

@Override
public int hashCode() {
return Objects.hash(relatedCustomResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
Expand Down Expand Up @@ -297,7 +297,6 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
}

private void cleanupForDeletedEvent(ResourceID customResourceUid) {
eventSourceManager.cleanupForCustomResource(customResourceUid);
eventMarker.cleanup(customResourceUid);
metrics.cleanupDoneFor(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

public class EventSourceManager<R extends HasMetadata>
implements EventSourceRegistry<R>, LifecycleAware {
Expand Down Expand Up @@ -107,14 +109,22 @@ public final void registerEventSource(EventSource eventSource)
}
}

public void cleanupForCustomResource(ResourceID customResourceUid) {
lock.lock();
try {
for (EventSource eventSource : this.eventSources) {
eventSource.cleanupForResource(customResourceUid);
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
for (EventSource eventSource : this.eventSources) {
if (eventSource instanceof ResourceEventAware) {
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
switch (action) {
case ADDED:
lifecycleAwareES.onResourceCreated(resource);
break;
case UPDATED:
lifecycleAwareES.onResourceUpdated(resource, oldResource);
break;
case DELETED:
lifecycleAwareES.onResourceDeleted(resource);
break;
}
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;

public class ResourceID {
public class ResourceID implements Serializable {

public static ResourceID fromResource(HasMetadata resource) {
return new ResourceID(resource.getMetadata().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ public abstract class AbstractEventSource implements EventSource {
public void setEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Optional;

import javax.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

/**
* Base class for event sources with caching capabilities.
* <p>
* {@link #handleDelete(ResourceID)} - if the related resource is present in the cache it is removed
* and event propagated. There is no event propagated if the resource is not in the cache.
* <p>
* {@link #handleEvent(Object, ResourceID)} - caches the resource if changed or missing. Propagates
* an event if the resource is new or not equals to the one in the cache, and if accepted by the
* filter if one is present.
*
* @param <T> represents the type of resources (usually external non-kubernetes ones) being handled.
*/
public abstract class CachingEventSource<T> extends LifecycleAwareEventSource {

private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class);

protected Cache<ResourceID, T> cache;

public CachingEventSource(Cache<ResourceID, T> cache) {
this.cache = cache;
}

protected void handleDelete(ResourceID relatedResourceID) {
if (!isRunning()) {
return;
}
var cachedValue = cache.get(relatedResourceID);
cache.remove(relatedResourceID);
// we only propagate event if the resource was previously in cache
if (cachedValue != null) {
eventHandler.handleEvent(new Event(relatedResourceID));
}
}

protected void handleEvent(T value, ResourceID relatedResourceID) {
if (!isRunning()) {
return;
}
var cachedValue = cache.get(relatedResourceID);
if (cachedValue == null || !cachedValue.equals(value)) {
cache.put(relatedResourceID, value);
eventHandler.handleEvent(new Event(relatedResourceID));
}
}

public Cache<ResourceID, T> getCache() {
return cache;
}

public Optional<T> getCachedValue(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}

@Override
public void stop() throws OperatorException {
super.stop();
cache.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public interface EventSource extends LifecycleAware {

void setEventHandler(EventHandler eventHandler);

/**
* Automatically called when a custom resource is deleted from the cluster.
*
* @param customResourceUid - id of custom resource
*/
default void cleanupForResource(ResourceID customResourceUid) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;

public interface EventSourceRegistry<T extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.javaoperatorsdk.operator.OperatorException;

public abstract class LifecycleAwareEventSource extends AbstractEventSource {

private volatile boolean running = false;

public boolean isRunning() {
return running;
}

@Override
public void start() throws OperatorException {
running = true;
}

@Override
public void stop() throws OperatorException {
running = false;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface ResourceEventAware<T extends HasMetadata> {

default void onResourceCreated(T resource) {}

default void onResourceUpdated(T newResource, T oldResource) {}

default void onResourceDeleted(T resource) {}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.javaoperatorsdk.operator.processing.event.source;
package io.javaoperatorsdk.operator.processing.event.source.controller;

import java.util.Map;
import java.util.Optional;
Expand All @@ -11,7 +11,7 @@
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
import static io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;

public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {

Expand Down
Loading