Skip to content

Commit 5098ffc

Browse files
committed
feat: bounded cache for informers (#1718)
1 parent d6e746e commit 5098ffc

31 files changed

+1071
-26
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>java-operator-sdk</artifactId>
7+
<groupId>io.javaoperatorsdk</groupId>
8+
<version>4.3.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>caffeine-bounded-cache-support</artifactId>
13+
<name>Operator SDK - Caffeine Bounded Cache Support</name>
14+
15+
<properties>
16+
<maven.compiler.source>11</maven.compiler.source>
17+
<maven.compiler.target>11</maven.compiler.target>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>io.javaoperatorsdk</groupId>
23+
<artifactId>operator-framework-core</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.github.ben-manes.caffeine</groupId>
27+
<artifactId>caffeine</artifactId>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.javaoperatorsdk</groupId>
31+
<artifactId>operator-framework</artifactId>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.javaoperatorsdk</groupId>
36+
<artifactId>operator-framework-junit-5</artifactId>
37+
<version>${project.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.fabric8</groupId>
42+
<artifactId>crd-generator-apt</artifactId>
43+
<scope>test</scope>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.logging.log4j</groupId>
47+
<artifactId>log4j-slf4j-impl</artifactId>
48+
<scope>test</scope>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.logging.log4j</groupId>
52+
<artifactId>log4j-core</artifactId>
53+
<version>${log4j.version}</version>
54+
<type>test-jar</type>
55+
<scope>test</scope>
56+
</dependency>
57+
</dependencies>
58+
59+
<build>
60+
<plugins>
61+
<plugin>
62+
<artifactId>maven-compiler-plugin</artifactId>
63+
<version>${maven-compiler-plugin.version}</version>
64+
<executions>
65+
<!-- During compilation we need to disable annotation processors (at least the ControllerConfigurationAnnotationProcessor).
66+
However, this is needed to compile the tests so let's disable apt just for the compile phase -->
67+
<execution>
68+
<id>default-compile</id>
69+
<phase>compile</phase>
70+
<goals>
71+
<goal>compile</goal>
72+
</goals>
73+
<configuration>
74+
<compilerArgs>
75+
<arg>-proc:none</arg>
76+
</compilerArgs>
77+
</configuration>
78+
</execution>
79+
</executions>
80+
</plugin>
81+
</plugins>
82+
</build>
83+
84+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import com.github.benmanes.caffeine.cache.Cache;
4+
5+
/**
6+
* Caffein cache wrapper to be used in a {@link BoundedItemStore}
7+
*/
8+
public class CaffeineBoundedCache<K, R> implements BoundedCache<K, R> {
9+
10+
private Cache<K, R> cache;
11+
12+
public CaffeineBoundedCache(Cache<K, R> cache) {
13+
this.cache = cache;
14+
}
15+
16+
@Override
17+
public R get(K key) {
18+
return cache.getIfPresent(key);
19+
}
20+
21+
@Override
22+
public R remove(K key) {
23+
var value = cache.getIfPresent(key);
24+
cache.invalidate(key);
25+
return value;
26+
}
27+
28+
@Override
29+
public void put(K key, R object) {
30+
cache.put(key, object);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.time.Duration;
4+
5+
import io.fabric8.kubernetes.api.model.HasMetadata;
6+
import io.fabric8.kubernetes.client.KubernetesClient;
7+
8+
import com.github.benmanes.caffeine.cache.Cache;
9+
import com.github.benmanes.caffeine.cache.Caffeine;
10+
11+
/**
12+
* The idea about CaffeinBoundedItemStore-s is that, caffeine will cache the resources which were
13+
* recently used, and will evict resource, which are not used for a while. This is ideal from the
14+
* perspective that on startup controllers reconcile all resources (this is why a maxSize not ideal)
15+
* but after a while it can happen (well depending on the controller and domain) that only some
16+
* resources are actually active, thus related events happen. So in case large amount of custom
17+
* resources only the active once will remain in the cache. Note that if a resource is reconciled
18+
* all the secondary resources are usually reconciled too, in that case all those resources are
19+
* fetched and populated to the cache, and will remain there for some time, for a subsequent
20+
* reconciliations.
21+
*/
22+
public class CaffeineBoundedItemStores {
23+
24+
private CaffeineBoundedItemStores() {}
25+
26+
/**
27+
* @param client Kubernetes Client
28+
* @param rClass resource class
29+
* @param accessExpireDuration the duration after resources is evicted from cache if not accessed.
30+
* @return the ItemStore implementation
31+
* @param <R> resource type
32+
*/
33+
public static <R extends HasMetadata> BoundedItemStore<R> boundedItemStore(
34+
KubernetesClient client, Class<R> rClass,
35+
Duration accessExpireDuration) {
36+
Cache<String, R> cache = Caffeine.newBuilder()
37+
.expireAfterAccess(accessExpireDuration)
38+
.build();
39+
return boundedItemStore(client, rClass, cache);
40+
}
41+
42+
public static <R extends HasMetadata> BoundedItemStore<R> boundedItemStore(
43+
KubernetesClient client, Class<R> rClass, Cache<String, R> cache) {
44+
return new BoundedItemStore<>(new CaffeineBoundedCache<>(cache), rClass, client);
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.time.Duration;
4+
import java.util.stream.IntStream;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import io.fabric8.kubernetes.api.model.ConfigMap;
11+
import io.fabric8.kubernetes.client.CustomResource;
12+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
13+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;
14+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestStatus;
15+
16+
import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.DATA_KEY;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.awaitility.Awaitility.await;
19+
20+
public abstract class BoundedCacheTestBase<P extends CustomResource<BoundedCacheTestSpec, BoundedCacheTestStatus>> {
21+
22+
private static final Logger log = LoggerFactory.getLogger(BoundedCacheTestBase.class);
23+
24+
public static final int NUMBER_OF_RESOURCE_TO_TEST = 3;
25+
public static final String RESOURCE_NAME_PREFIX = "test-";
26+
public static final String INITIAL_DATA_PREFIX = "data-";
27+
public static final String UPDATED_PREFIX = "updatedPrefix";
28+
29+
@Test
30+
void reconciliationWorksWithLimitedCache() {
31+
createTestResources();
32+
33+
assertConfigMapData(INITIAL_DATA_PREFIX);
34+
35+
updateTestResources();
36+
37+
assertConfigMapData(UPDATED_PREFIX);
38+
39+
deleteTestResources();
40+
41+
assertConfigMapsDeleted();
42+
}
43+
44+
private void assertConfigMapsDeleted() {
45+
await().atMost(Duration.ofSeconds(30))
46+
.untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
47+
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
48+
assertThat(cm).isNull();
49+
}));
50+
}
51+
52+
private void deleteTestResources() {
53+
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
54+
var cm = extension().get(customResourceClass(), RESOURCE_NAME_PREFIX + i);
55+
var deleted = extension().delete(cm);
56+
if (!deleted) {
57+
log.warn("Custom resource might not be deleted: {}", cm);
58+
}
59+
});
60+
}
61+
62+
private void updateTestResources() {
63+
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
64+
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
65+
cm.getData().put(DATA_KEY, UPDATED_PREFIX + i);
66+
extension().replace(cm);
67+
});
68+
}
69+
70+
void assertConfigMapData(String dataPrefix) {
71+
await().untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST)
72+
.forEach(i -> assertConfigMap(i, dataPrefix)));
73+
}
74+
75+
private void assertConfigMap(int i, String prefix) {
76+
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
77+
assertThat(cm).isNotNull();
78+
assertThat(cm.getData().get(DATA_KEY)).isEqualTo(prefix + i);
79+
}
80+
81+
private void createTestResources() {
82+
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
83+
extension().create(createTestResource(i));
84+
});
85+
}
86+
87+
abstract P createTestResource(int index);
88+
89+
abstract Class<P> customResourceClass();
90+
91+
abstract LocallyRunOperatorExtension extension();
92+
93+
94+
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.extension.RegisterExtension;
6+
7+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
8+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
9+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
10+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestCustomResource;
11+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestReconciler;
12+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;
13+
14+
import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore;
15+
16+
public class CaffeineBoundedCacheClusterScopeIT
17+
extends BoundedCacheTestBase<BoundedCacheClusterScopeTestCustomResource> {
18+
19+
@RegisterExtension
20+
LocallyRunOperatorExtension extension =
21+
LocallyRunOperatorExtension.builder()
22+
.withReconciler(new BoundedCacheClusterScopeTestReconciler(), o -> {
23+
o.withItemStore(boundedItemStore(
24+
new KubernetesClientBuilder().build(),
25+
BoundedCacheClusterScopeTestCustomResource.class,
26+
Duration.ofMinutes(1),
27+
1));
28+
})
29+
.build();
30+
31+
@Override
32+
BoundedCacheClusterScopeTestCustomResource createTestResource(int index) {
33+
var res = new BoundedCacheClusterScopeTestCustomResource();
34+
res.setMetadata(new ObjectMetaBuilder()
35+
.withName(RESOURCE_NAME_PREFIX + index)
36+
.build());
37+
res.setSpec(new BoundedCacheTestSpec());
38+
res.getSpec().setData(INITIAL_DATA_PREFIX + index);
39+
res.getSpec().setTargetNamespace(extension.getNamespace());
40+
return res;
41+
}
42+
43+
@Override
44+
Class<BoundedCacheClusterScopeTestCustomResource> customResourceClass() {
45+
return BoundedCacheClusterScopeTestCustomResource.class;
46+
}
47+
48+
@Override
49+
LocallyRunOperatorExtension extension() {
50+
return extension;
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.extension.RegisterExtension;
6+
7+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
8+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
9+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
10+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestCustomResource;
11+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestReconciler;
12+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;
13+
14+
import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore;
15+
16+
class CaffeineBoundedCacheNamespacedIT
17+
extends BoundedCacheTestBase<BoundedCacheTestCustomResource> {
18+
19+
@RegisterExtension
20+
LocallyRunOperatorExtension extension =
21+
LocallyRunOperatorExtension.builder().withReconciler(new BoundedCacheTestReconciler(), o -> {
22+
o.withItemStore(boundedItemStore(
23+
new KubernetesClientBuilder().build(), BoundedCacheTestCustomResource.class,
24+
Duration.ofMinutes(1),
25+
1));
26+
})
27+
.build();
28+
29+
BoundedCacheTestCustomResource createTestResource(int index) {
30+
var res = new BoundedCacheTestCustomResource();
31+
res.setMetadata(new ObjectMetaBuilder()
32+
.withName(RESOURCE_NAME_PREFIX + index)
33+
.build());
34+
res.setSpec(new BoundedCacheTestSpec());
35+
res.getSpec().setData(INITIAL_DATA_PREFIX + index);
36+
res.getSpec().setTargetNamespace(extension.getNamespace());
37+
return res;
38+
}
39+
40+
@Override
41+
Class<BoundedCacheTestCustomResource> customResourceClass() {
42+
return BoundedCacheTestCustomResource.class;
43+
}
44+
45+
@Override
46+
LocallyRunOperatorExtension extension() {
47+
return extension;
48+
}
49+
50+
}

0 commit comments

Comments
 (0)