Skip to content

Commit 2e214a8

Browse files
committed
feat: graceful shutdown (#1735)
1 parent acf3c7a commit 2e214a8

File tree

12 files changed

+252
-26
lines changed

12 files changed

+252
-26
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator;
22

3+
import java.time.Duration;
34
import java.util.HashSet;
45
import java.util.Optional;
56
import java.util.Set;
@@ -77,14 +78,31 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur
7778
}
7879

7980
/**
80-
* Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down.
81+
* Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout
82+
*
83+
* @deprecated use the overloaded version with graceful shutdown timeout parameter.
8184
*
82-
* @deprecated This feature should not be used anymore
8385
*/
8486
@Deprecated(forRemoval = true)
8587
public void installShutdownHook() {
88+
installShutdownHook(
89+
Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds()));
90+
}
91+
92+
/**
93+
* Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note
94+
* that graceful shutdown is usually not needed, but your {@link Reconciler} implementations might
95+
* require it.
96+
* <p>
97+
* Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the
98+
* controller.
99+
*
100+
* @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual
101+
* reconciliations
102+
*/
103+
public void installShutdownHook(Duration gracefulShutdownTimeout) {
86104
if (!leaderElectionManager.isLeaderElectionEnabled()) {
87-
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
105+
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
88106
} else {
89107
log.warn("Leader election is on, shutdown hook will not be installed.");
90108
}
@@ -126,14 +144,16 @@ public synchronized void start() {
126144
}
127145
}
128146

129-
@Override
130-
public void stop() throws OperatorException {
147+
public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
148+
if (!started) {
149+
return;
150+
}
131151
final var configurationService = ConfigurationServiceProvider.instance();
132152
log.info(
133153
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
134154
controllerManager.stop();
135155

136-
ExecutorServiceManager.stop();
156+
ExecutorServiceManager.stop(gracefulShutdownTimeout);
137157
leaderElectionManager.stop();
138158
if (configurationService.closeClientOnStop()) {
139159
kubernetesClient.close();
@@ -142,6 +162,11 @@ public void stop() throws OperatorException {
142162
started = false;
143163
}
144164

165+
@Override
166+
public void stop() throws OperatorException {
167+
stop(Duration.ZERO);
168+
}
169+
145170
/**
146171
* Add a registration requests for the specified reconciler with this operator. The effective
147172
* registration of the reconciler is delayed till the operator is started.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,12 @@ public HasMetadata clone(HasMetadata object) {
121121
* Retrieves the number of seconds the SDK waits for reconciliation threads to terminate before
122122
* shutting down.
123123
*
124+
* @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the
125+
* parameter can be passed to specify graceful timeout.
126+
*
124127
* @return the number of seconds to wait before terminating reconciliation threads
125128
*/
129+
@Deprecated(forRemoval = true)
126130
default int getTerminationTimeoutSeconds() {
127131
return DEFAULT_TERMINATION_TIMEOUT_SECONDS;
128132
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Collection;
45
import java.util.List;
56
import java.util.concurrent.Callable;
@@ -24,23 +25,20 @@ public class ExecutorServiceManager {
2425
private final ExecutorService executor;
2526
private final ExecutorService workflowExecutor;
2627
private final ExecutorService cachingExecutorService;
27-
private final int terminationTimeoutSeconds;
2828

29-
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
30-
int terminationTimeoutSeconds) {
29+
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor) {
3130
this.cachingExecutorService = Executors.newCachedThreadPool();
3231
this.executor = new InstrumentedExecutorService(executor);
3332
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
34-
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
33+
3534
}
3635

37-
public static void init() {
36+
public static synchronized void init() {
3837
if (instance == null) {
3938
final var configuration = ConfigurationServiceProvider.instance();
4039
final var executorService = configuration.getExecutorService();
4140
final var workflowExecutorService = configuration.getWorkflowExecutorService();
42-
instance = new ExecutorServiceManager(executorService, workflowExecutorService,
43-
configuration.getTerminationTimeoutSeconds());
41+
instance = new ExecutorServiceManager(executorService, workflowExecutorService);
4442
log.debug(
4543
"Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}",
4644
executorService.getClass(),
@@ -51,16 +49,23 @@ public static void init() {
5149
}
5250
}
5351

54-
public static synchronized void stop() {
52+
/** For testing purposes only */
53+
public static synchronized void reset() {
54+
instance().doStop(Duration.ZERO);
55+
instance = null;
56+
init();
57+
}
58+
59+
public static synchronized void stop(Duration gracefulShutdownTimeout) {
5560
if (instance != null) {
56-
instance.doStop();
61+
instance.doStop(gracefulShutdownTimeout);
5762
}
5863
// make sure that we remove the singleton so that the thread pool is re-created on next call to
5964
// start
6065
instance = null;
6166
}
6267

63-
public synchronized static ExecutorServiceManager instance() {
68+
public static synchronized ExecutorServiceManager instance() {
6469
if (instance == null) {
6570
// provide a default configuration if none has been provided by init
6671
init();
@@ -128,23 +133,30 @@ public ExecutorService cachingExecutorService() {
128133
return cachingExecutorService;
129134
}
130135

131-
private void doStop() {
136+
private void doStop(Duration gracefulShutdownTimeout) {
132137
try {
138+
var parallelExec = Executors.newFixedThreadPool(3);
133139
log.debug("Closing executor");
134-
shutdown(executor);
135-
shutdown(workflowExecutor);
136-
shutdown(cachingExecutorService);
140+
parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout),
141+
shutdown(workflowExecutor, gracefulShutdownTimeout),
142+
shutdown(cachingExecutorService, gracefulShutdownTimeout)));
143+
parallelExec.shutdownNow();
137144
} catch (InterruptedException e) {
138145
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
139146
Thread.currentThread().interrupt();
140147
}
141148
}
142149

143-
private static void shutdown(ExecutorService executorService) throws InterruptedException {
144-
executorService.shutdown();
145-
if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
146-
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
147-
}
150+
private static Callable<Void> shutdown(ExecutorService executorService,
151+
Duration gracefulShutdownTimeout) {
152+
return () -> {
153+
executorService.shutdown();
154+
if (!executorService.awaitTermination(gracefulShutdownTimeout.toMillis(),
155+
TimeUnit.MILLISECONDS)) {
156+
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
157+
}
158+
return null;
159+
};
148160
}
149161

150162
private static class InstrumentedExecutorService implements ExecutorService {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,13 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionSco
388388

389389
@Override
390390
public void run() {
391+
if (!running) {
392+
// this is needed for the case when controller stopped, but there is a graceful shutdown
393+
// timeout. that should finish the currently executing reconciliations but not the ones
394+
// which where submitted but not started yet
395+
log.debug("Event processor not running skipping resource processing: {}", resourceID);
396+
return;
397+
}
391398
// change thread name for easier debugging
392399
final var thread = Thread.currentThread();
393400
final var name = thread.getName();

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import static org.junit.Assert.assertEquals;
66

7-
public class VersionTest {
7+
class VersionTest {
88

99
@Test
1010
void versionShouldReturnTheSameResultFromMavenAndProperties() {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.slf4j.LoggerFactory;
1616

1717
import io.fabric8.kubernetes.api.model.HasMetadata;
18+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1819
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
20+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1921
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
2022
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
2123
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
@@ -39,6 +41,7 @@
3941
import static org.mockito.Mockito.after;
4042
import static org.mockito.Mockito.any;
4143
import static org.mockito.Mockito.anyLong;
44+
import static org.mockito.Mockito.atMostOnce;
4245
import static org.mockito.Mockito.doAnswer;
4346
import static org.mockito.Mockito.mock;
4447
import static org.mockito.Mockito.never;
@@ -56,6 +59,8 @@ class EventProcessorTest {
5659
public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250;
5760
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
5861
public static final String TEST_NAMESPACE = "default-event-handler-test";
62+
public static final int TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION = 150;
63+
public static final int DISPATCHING_DELAY = 250;
5964

6065
private final ReconciliationDispatcher reconciliationDispatcherMock =
6166
mock(ReconciliationDispatcher.class);
@@ -417,6 +422,28 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
417422
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
418423
}
419424

425+
@Test
426+
void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
427+
when(reconciliationDispatcherMock.handleExecution(any()))
428+
.then((Answer<PostExecutionControl>) invocationOnMock -> {
429+
Thread.sleep(DISPATCHING_DELAY);
430+
return PostExecutionControl.defaultDispatch();
431+
});
432+
// one event will lock the thread / executor
433+
ConfigurationServiceProvider.overrideCurrent(o -> o.withConcurrentReconciliationThreads(1));
434+
ExecutorServiceManager.reset();
435+
eventProcessor.start();
436+
437+
eventProcessor.handleEvent(prepareCREvent());
438+
eventProcessor.handleEvent(prepareCREvent());
439+
eventProcessor.stop();
440+
441+
// wait until both event should be handled
442+
Thread.sleep(TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION + 2 * DISPATCHING_DELAY);
443+
verify(reconciliationDispatcherMock, atMostOnce())
444+
.handleExecution(any());
445+
}
446+
420447
private ResourceID eventAlreadyUnderProcessing() {
421448
when(reconciliationDispatcherMock.handleExecution(any()))
422449
.then(

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ static void classSetup() {
7575
* equals will fail on the two equal but NOT identical TestCustomResources because equals is not
7676
* implemented on TestCustomResourceSpec or TestCustomResourceStatus
7777
*/
78+
ConfigurationServiceProvider.reset();
7879
ConfigurationServiceProvider.overrideCurrent(overrider -> overrider
7980
.checkingCRDAndValidateLocalModel(false).withResourceCloner(new Cloner() {
8081
@Override
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
9+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
10+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource;
11+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec;
12+
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler;
13+
14+
import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.RECONCILER_SLEEP;
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.awaitility.Awaitility.await;
17+
18+
public class GracefulStopIT {
19+
20+
public static final String TEST_1 = "test1";
21+
public static final String TEST_2 = "test2";
22+
23+
@RegisterExtension
24+
LocallyRunOperatorExtension operator =
25+
LocallyRunOperatorExtension.builder()
26+
.withConfigurationService(o -> o.withCloseClientOnStop(false))
27+
.withReconciler(new GracefulStopTestReconciler())
28+
.build();
29+
30+
@Test
31+
void stopsGracefullyWIthTimeout() {
32+
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
33+
}
34+
35+
@Test
36+
void stopsGracefullyWithExpiredTimeout() {
37+
testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1);
38+
}
39+
40+
private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) {
41+
var testRes = operator.create(testResource(resourceName));
42+
await().untilAsserted(() -> {
43+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
44+
assertThat(r.getStatus()).isNotNull();
45+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1);
46+
assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
47+
.getNumberOfExecutions()).isEqualTo(1);
48+
});
49+
50+
testRes.getSpec().setValue(2);
51+
operator.replace(testRes);
52+
53+
await().pollDelay(Duration.ofMillis(50)).untilAsserted(
54+
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
55+
.getNumberOfExecutions()).isEqualTo(2));
56+
57+
operator.getOperator().stop(Duration.ofMillis(stopTimeout));
58+
59+
await().untilAsserted(() -> {
60+
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
61+
assertThat(r.getStatus()).isNotNull();
62+
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration);
63+
});
64+
}
65+
66+
public GracefulStopTestCustomResource testResource(String name) {
67+
GracefulStopTestCustomResource resource =
68+
new GracefulStopTestCustomResource();
69+
resource.setMetadata(
70+
new ObjectMetaBuilder()
71+
.withName(name)
72+
.withNamespace(operator.getNamespace())
73+
.build());
74+
resource.setSpec(new GracefulStopTestCustomResourceSpec());
75+
resource.getSpec().setValue(1);
76+
return resource;
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.javaoperatorsdk.operator.sample.gracefulstop;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("gst")
12+
public class GracefulStopTestCustomResource
13+
extends CustomResource<GracefulStopTestCustomResourceSpec, GracefulStopTestCustomResourceStatus>
14+
implements Namespaced {
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.javaoperatorsdk.operator.sample.gracefulstop;
2+
3+
public class GracefulStopTestCustomResourceSpec {
4+
5+
private int value;
6+
7+
public int getValue() {
8+
return value;
9+
}
10+
11+
public void setValue(int value) {
12+
this.value = value;
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.javaoperatorsdk.operator.sample.gracefulstop;
2+
3+
import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;
4+
5+
public class GracefulStopTestCustomResourceStatus extends ObservedGenerationAwareStatus {
6+
7+
}

0 commit comments

Comments
 (0)