Skip to content

Commit 3751a68

Browse files
authored
feat: improved per resource polling (#1826)
1 parent 3ec1335 commit 3751a68

File tree

2 files changed

+157
-57
lines changed

2 files changed

+157
-57
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 80 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package io.javaoperatorsdk.operator.processing.event.source.polling;
22

3+
import java.time.Duration;
34
import java.util.*;
4-
import java.util.concurrent.ConcurrentHashMap;
5+
import java.util.concurrent.*;
56
import java.util.function.Predicate;
67

78
import org.slf4j.Logger;
@@ -32,8 +33,10 @@ public class PerResourcePollingEventSource<R, P extends HasMetadata>
3233

3334
private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);
3435

35-
private final Timer timer = new Timer();
36-
private final Map<ResourceID, TimerTask> timerTasks = new ConcurrentHashMap<>();
36+
public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1;
37+
38+
private final ScheduledExecutorService executorService;
39+
private final Map<ResourceID, ScheduledFuture<Void>> scheduledFutures = new ConcurrentHashMap<>();
3740
private final ResourceFetcher<R, P> resourceFetcher;
3841
private final Cache<P> resourceCache;
3942
private final Predicate<P> registerPredicate;
@@ -57,11 +60,20 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
5760
Cache<P> resourceCache, long period,
5861
Predicate<P> registerPredicate, Class<R> resourceClass,
5962
CacheKeyMapper<R> cacheKeyMapper) {
63+
this(resourceFetcher, resourceCache, period, registerPredicate, resourceClass, cacheKeyMapper,
64+
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
65+
}
66+
67+
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
68+
Cache<P> resourceCache, long period,
69+
Predicate<P> registerPredicate, Class<R> resourceClass,
70+
CacheKeyMapper<R> cacheKeyMapper, ScheduledExecutorService executorService) {
6071
super(resourceClass, cacheKeyMapper);
6172
this.resourceFetcher = resourceFetcher;
6273
this.resourceCache = resourceCache;
6374
this.period = period;
6475
this.registerPredicate = registerPredicate;
76+
this.executorService = executorService;
6577
}
6678

6779
private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
@@ -71,6 +83,17 @@ private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
7183
return values;
7284
}
7385

86+
@SuppressWarnings("unchecked")
87+
private void scheduleNextExecution(P primary, Set<R> actualResources) {
88+
var primaryID = ResourceID.fromResource(primary);
89+
var fetchDelay = resourceFetcher.fetchDelay(actualResources, primary);
90+
var fetchDuration = fetchDelay.orElse(Duration.ofMillis(period));
91+
92+
ScheduledFuture<Void> scheduledFuture = (ScheduledFuture<Void>) executorService
93+
.schedule(new FetchingExecutor(primaryID), fetchDuration.toMillis(), TimeUnit.MILLISECONDS);
94+
scheduledFutures.put(primaryID, scheduledFuture);
95+
}
96+
7497
@Override
7598
public void onResourceCreated(P resource) {
7699
checkAndRegisterTask(resource);
@@ -84,41 +107,53 @@ public void onResourceUpdated(P newResource, P oldResource) {
84107
@Override
85108
public void onResourceDeleted(P resource) {
86109
var resourceID = ResourceID.fromResource(resource);
87-
TimerTask task = timerTasks.remove(resourceID);
88-
if (task != null) {
89-
log.debug("Canceling task for resource: {}", resource);
90-
task.cancel();
110+
var scheduledFuture = scheduledFutures.remove(resourceID);
111+
if (scheduledFuture != null) {
112+
log.debug("Canceling scheduledFuture for resource: {}", resource);
113+
scheduledFuture.cancel(true);
91114
}
92115
handleDelete(resourceID);
93116
fetchedForPrimaries.remove(resourceID);
94117
}
95118

96119
// This method is always called from the same Thread for the same resource,
97120
// since events from ResourceEventAware are propagated from the thread of the informer. This is
98-
// important
99-
// because otherwise there will be a race condition related to the timerTasks.
121+
// important because otherwise there will be a race condition related to the timerTasks.
100122
private void checkAndRegisterTask(P resource) {
101123
var primaryID = ResourceID.fromResource(resource);
102-
if (timerTasks.get(primaryID) == null && (registerPredicate == null
124+
if (scheduledFutures.get(primaryID) == null && (registerPredicate == null
103125
|| registerPredicate.test(resource))) {
104-
var task =
105-
new TimerTask() {
106-
@Override
107-
public void run() {
108-
if (!isRunning()) {
109-
log.debug("Event source not yet started. Will not run for: {}", primaryID);
110-
return;
111-
}
112-
// always use up-to-date resource from cache
113-
var res = resourceCache.get(primaryID);
114-
res.ifPresentOrElse(p -> getAndCacheResource(p, false),
115-
() -> log.warn("No resource in cache for resource ID: {}", primaryID));
116-
}
117-
};
118-
timerTasks.put(primaryID, task);
119-
// there is a delay, to not do two fetches when the resources first appeared
126+
var cachedResources = cache.get(primaryID);
127+
var actualResources =
128+
cachedResources == null ? null : new HashSet<>(cachedResources.values());
129+
// note that there is a delay, to not do two fetches when the resources first appeared
120130
// and getSecondaryResource is called on reconciliation.
121-
timer.schedule(task, period, period);
131+
scheduleNextExecution(resource, actualResources);
132+
}
133+
}
134+
135+
private class FetchingExecutor implements Runnable {
136+
private final ResourceID primaryID;
137+
138+
public FetchingExecutor(ResourceID primaryID) {
139+
this.primaryID = primaryID;
140+
}
141+
142+
@Override
143+
public void run() {
144+
if (!isRunning()) {
145+
log.debug("Event source not yet started. Will not run for: {}", primaryID);
146+
return;
147+
}
148+
// always use up-to-date resource from cache
149+
var primary = resourceCache.get(primaryID);
150+
if (primary.isEmpty()) {
151+
log.warn("No resource in cache for resource ID: {}", primaryID);
152+
// no new execution is scheduled in this case, a on delete event should be received shortly
153+
} else {
154+
var actualResources = primary.map(p -> getAndCacheResource(p, false));
155+
scheduleNextExecution(primary.get(), actualResources.orElse(null));
156+
}
122157
}
123158
}
124159

@@ -146,12 +181,28 @@ public Set<R> getSecondaryResources(P primary) {
146181

147182
public interface ResourceFetcher<R, P> {
148183
Set<R> fetchResources(P primaryResource);
184+
185+
/**
186+
* By implementing this method it is possible to specify dynamic durations to wait between the
187+
* polls of the resources. This is especially handy if a resources "stabilized" so it is not
188+
* expected to change its state frequently. For example an AWS RDS instance is up and running,
189+
* it is expected to run and be stable for a very long time. In this case it is enough to poll
190+
* with a lower frequency, compared to the phase when it is being initialized.
191+
*
192+
* @param lastFetchedResource might be null, in case no fetch happened before. Empty set if
193+
* fetch happened but no resources were found.
194+
* @param primary related primary resource
195+
* @return an Optional containing the Duration to wait until the next fetch. If an empty
196+
* Optional is returned, the default polling period will be used.
197+
*/
198+
default Optional<Duration> fetchDelay(Set<R> lastFetchedResource, P primary) {
199+
return Optional.empty();
200+
}
149201
}
150202

151203
@Override
152204
public void stop() throws OperatorException {
153205
super.stop();
154-
timer.cancel();
206+
executorService.shutdownNow();
155207
}
156-
157208
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event.source.polling;
22

3+
import java.time.Duration;
34
import java.util.Collections;
45
import java.util.Optional;
56
import java.util.Set;
@@ -16,6 +17,7 @@
1617
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
1718

1819
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.awaitility.Awaitility.await;
1921
import static org.mockito.ArgumentMatchers.any;
2022
import static org.mockito.ArgumentMatchers.eq;
2123
import static org.mockito.Mockito.atLeast;
@@ -47,45 +49,50 @@ public void setup() {
4749
}
4850

4951
@Test
50-
void pollsTheResourceAfterAwareOfIt() throws InterruptedException {
52+
void pollsTheResourceAfterAwareOfIt() {
5153
source.onResourceCreated(testCustomResource);
5254

53-
Thread.sleep(3 * PERIOD);
54-
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
55-
verify(eventHandler, times(1)).handleEvent(any());
55+
await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> {
56+
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
57+
verify(supplier, atLeast(2)).fetchDelay(any(), eq(testCustomResource));
58+
verify(eventHandler, times(1)).handleEvent(any());
59+
});
5660
}
5761

5862
@Test
59-
void registeringTaskOnAPredicate() throws InterruptedException {
63+
void registeringTaskOnAPredicate() {
6064
setUpSource(new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
6165
testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1,
6266
SampleExternalResource.class, CacheKeyMapper.singleResourceCacheKeyMapper()));
6367
source.onResourceCreated(testCustomResource);
64-
Thread.sleep(2 * PERIOD);
6568

66-
verify(supplier, times(0)).fetchResources(eq(testCustomResource));
69+
70+
await().pollDelay(Duration.ofMillis(2 * PERIOD))
71+
.untilAsserted(() -> verify(supplier, times(0)).fetchResources(eq(testCustomResource)));
72+
6773
testCustomResource.getMetadata().setGeneration(2L);
6874
source.onResourceUpdated(testCustomResource, testCustomResource);
6975

70-
Thread.sleep(2 * PERIOD);
7176

72-
verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource));
77+
await().pollDelay(Duration.ofMillis(2 * PERIOD))
78+
.untilAsserted(() -> verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource)));
7379
}
7480

7581
@Test
76-
void propagateEventOnDeletedResource() throws InterruptedException {
82+
void propagateEventOnDeletedResource() {
7783
source.onResourceCreated(testCustomResource);
7884
when(supplier.fetchResources(any()))
7985
.thenReturn(Set.of(SampleExternalResource.testResource1()))
8086
.thenReturn(Collections.emptySet());
8187

82-
Thread.sleep(3 * PERIOD);
83-
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
84-
verify(eventHandler, times(2)).handleEvent(any());
88+
await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> {
89+
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
90+
verify(eventHandler, times(2)).handleEvent(any());
91+
});
8592
}
8693

8794
@Test
88-
void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedException {
95+
void getSecondaryResourceInitiatesFetchJustForFirstTime() {
8996
source.onResourceCreated(testCustomResource);
9097
when(supplier.fetchResources(any()))
9198
.thenReturn(Set.of(SampleExternalResource.testResource1()))
@@ -104,31 +111,73 @@ void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedExce
104111
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
105112
verify(eventHandler, never()).handleEvent(any());
106113

107-
Thread.sleep(PERIOD * 2);
108-
109-
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
110-
value = source.getSecondaryResources(testCustomResource);
111-
assertThat(value).hasSize(2);
114+
await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> {
115+
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
116+
var val = source.getSecondaryResources(testCustomResource);
117+
assertThat(val).hasSize(2);
118+
});
112119
}
113120

114121
@Test
115-
void getsValueFromCacheOrSupplier() throws InterruptedException {
122+
void getsValueFromCacheOrSupplier() {
116123
source.onResourceCreated(testCustomResource);
117124
when(supplier.fetchResources(any()))
118125
.thenReturn(Collections.emptySet())
119126
.thenReturn(Set.of(SampleExternalResource.testResource1()));
120127

121-
Thread.sleep(PERIOD / 3);
128+
await().pollDelay(Duration.ofMillis(PERIOD / 3)).untilAsserted(() -> {
129+
var value = source.getSecondaryResources(testCustomResource);
130+
verify(eventHandler, times(0)).handleEvent(any());
131+
assertThat(value).isEmpty();
132+
});
133+
134+
await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> {
135+
var value2 = source.getSecondaryResources(testCustomResource);
136+
assertThat(value2).hasSize(1);
137+
verify(eventHandler, times(1)).handleEvent(any());
138+
});
139+
}
122140

123-
var value = source.getSecondaryResources(testCustomResource);
124-
verify(eventHandler, times(0)).handleEvent(any());
125-
assertThat(value).isEmpty();
141+
@Test
142+
void supportsDynamicPollingDelay() {
143+
when(supplier.fetchResources(any()))
144+
.thenReturn(Set.of(SampleExternalResource.testResource1()));
145+
when(supplier.fetchDelay(any(),any()))
146+
.thenReturn(Optional.of(Duration.ofMillis(PERIOD)))
147+
.thenReturn(Optional.of(Duration.ofMillis(PERIOD*2)));
126148

127-
Thread.sleep(PERIOD * 2);
149+
source.onResourceCreated(testCustomResource);
128150

129-
value = source.getSecondaryResources(testCustomResource);
130-
assertThat(value).hasSize(1);
131-
verify(eventHandler, times(1)).handleEvent(any());
151+
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5 * PERIOD)))
152+
.pollInterval(Duration.ofMillis(20))
153+
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
154+
// verifying that it is not called as with normal interval
155+
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5*PERIOD)))
156+
.pollInterval(Duration.ofMillis(20))
157+
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
158+
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis(2 * PERIOD))
159+
.pollInterval(Duration.ofMillis(20))
160+
.untilAsserted(() -> verify(supplier,times(2)).fetchResources(any()));
161+
}
162+
163+
@Test
164+
void deleteEventCancelsTheScheduling() {
165+
when(supplier.fetchResources(any()))
166+
.thenReturn(Set.of(SampleExternalResource.testResource1()));
167+
168+
source.onResourceCreated(testCustomResource);
169+
170+
await().pollDelay(Duration.ofMillis(PERIOD))
171+
.atMost(Duration.ofMillis((2* PERIOD)))
172+
.pollInterval(Duration.ofMillis(20))
173+
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
174+
175+
source.onResourceDeleted(testCustomResource);
176+
177+
// check if not called again.
178+
await().pollDelay(Duration.ofMillis(PERIOD))
179+
.atMost(Duration.ofMillis((2* PERIOD)))
180+
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
132181
}
133182

134183
}

0 commit comments

Comments
 (0)