Skip to content

feat: graceful shutdown #1735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -77,14 +78,31 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur
}

/**
* Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down.
* Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout
*
* @deprecated use the overloaded version with graceful shutdown timeout parameter.
*
* @deprecated This feature should not be used anymore
*/
@Deprecated(forRemoval = true)
public void installShutdownHook() {
installShutdownHook(
Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds()));
}

/**
* Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note
* that graceful shutdown is usually not needed, but your {@link Reconciler} implementations might
* require it.
* <p>
* Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the
* controller.
*
* @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual
* reconciliations
*/
public void installShutdownHook(Duration gracefulShutdownTimeout) {
if (!leaderElectionManager.isLeaderElectionEnabled()) {
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
} else {
log.warn("Leader election is on, shutdown hook will not be installed.");
}
Expand Down Expand Up @@ -126,14 +144,16 @@ public synchronized void start() {
}
}

@Override
public void stop() throws OperatorException {
public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
if (!started) {
return;
}
final var configurationService = ConfigurationServiceProvider.instance();
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();

ExecutorServiceManager.stop();
ExecutorServiceManager.stop(gracefulShutdownTimeout);
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
Expand All @@ -142,6 +162,11 @@ public void stop() throws OperatorException {
started = false;
}

@Override
public void stop() throws OperatorException {
stop(Duration.ZERO);
}

/**
* Add a registration requests for the specified reconciler with this operator. The effective
* registration of the reconciler is delayed till the operator is started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ public HasMetadata clone(HasMetadata object) {
* Retrieves the number of seconds the SDK waits for reconciliation threads to terminate before
* shutting down.
*
* @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why not keep this? It could be useful to be able to configure the default timeout as previously…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to have a single way to configure this, and it is much simpler with the parameters. Keeping this, would be two way to do the same thing.

* parameter can be passed to specify graceful timeout.
*
* @return the number of seconds to wait before terminating reconciliation threads
*/
@Deprecated(forRemoval = true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs an associated @deprecated tag in the javadoc…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

default int getTerminationTimeoutSeconds() {
return DEFAULT_TERMINATION_TIMEOUT_SECONDS;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -24,23 +25,20 @@ public class ExecutorServiceManager {
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final ExecutorService cachingExecutorService;
private final int terminationTimeoutSeconds;

private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
int terminationTimeoutSeconds) {
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor) {
this.cachingExecutorService = Executors.newCachedThreadPool();
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
this.terminationTimeoutSeconds = terminationTimeoutSeconds;

}

public static void init() {
public static synchronized void init() {
if (instance == null) {
final var configuration = ConfigurationServiceProvider.instance();
final var executorService = configuration.getExecutorService();
final var workflowExecutorService = configuration.getWorkflowExecutorService();
instance = new ExecutorServiceManager(executorService, workflowExecutorService,
configuration.getTerminationTimeoutSeconds());
instance = new ExecutorServiceManager(executorService, workflowExecutorService);
log.debug(
"Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}",
executorService.getClass(),
Expand All @@ -51,16 +49,23 @@ public static void init() {
}
}

public static synchronized void stop() {
/** For testing purposes only */
public static synchronized void reset() {
instance().doStop(Duration.ZERO);
instance = null;
init();
}

public static synchronized void stop(Duration gracefulShutdownTimeout) {
if (instance != null) {
instance.doStop();
instance.doStop(gracefulShutdownTimeout);
}
// make sure that we remove the singleton so that the thread pool is re-created on next call to
// start
instance = null;
}

public synchronized static ExecutorServiceManager instance() {
public static synchronized ExecutorServiceManager instance() {
if (instance == null) {
// provide a default configuration if none has been provided by init
init();
Expand Down Expand Up @@ -128,23 +133,30 @@ public ExecutorService cachingExecutorService() {
return cachingExecutorService;
}

private void doStop() {
private void doStop(Duration gracefulShutdownTimeout) {
try {
var parallelExec = Executors.newFixedThreadPool(3);
log.debug("Closing executor");
shutdown(executor);
shutdown(workflowExecutor);
shutdown(cachingExecutorService);
parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout),
shutdown(workflowExecutor, gracefulShutdownTimeout),
shutdown(cachingExecutorService, gracefulShutdownTimeout)));
parallelExec.shutdownNow();
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
Thread.currentThread().interrupt();
}
}

private static void shutdown(ExecutorService executorService) throws InterruptedException {
executorService.shutdown();
if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
}
private static Callable<Void> shutdown(ExecutorService executorService,
Duration gracefulShutdownTimeout) {
return () -> {
executorService.shutdown();
if (!executorService.awaitTermination(gracefulShutdownTimeout.toMillis(),
TimeUnit.MILLISECONDS)) {
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
}
return null;
};
}

private static class InstrumentedExecutorService implements ExecutorService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionSco

@Override
public void run() {
if (!running) {
// this is needed for the case when controller stopped, but there is a graceful shutdown
// timeout. that should finish the currently executing reconciliations but not the ones
// which where submitted but not started yet
log.debug("Event processor not running skipping resource processing: {}", resourceID);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this missing a return statement here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, wonder how the test passed, will check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the unit test, was not trivial, wonder if we should refactor this in the future.

return;
}
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import static org.junit.Assert.assertEquals;

public class VersionTest {
class VersionTest {

@Test
void versionShouldReturnTheSameResultFromMavenAndProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
Expand All @@ -39,6 +41,7 @@
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -56,6 +59,8 @@ class EventProcessorTest {
public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250;
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
public static final String TEST_NAMESPACE = "default-event-handler-test";
public static final int TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION = 150;
public static final int DISPATCHING_DELAY = 250;

private final ReconciliationDispatcher reconciliationDispatcherMock =
mock(ReconciliationDispatcher.class);
Expand Down Expand Up @@ -417,6 +422,28 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}

@Test
void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
when(reconciliationDispatcherMock.handleExecution(any()))
.then((Answer<PostExecutionControl>) invocationOnMock -> {
Thread.sleep(DISPATCHING_DELAY);
return PostExecutionControl.defaultDispatch();
});
// one event will lock the thread / executor
ConfigurationServiceProvider.overrideCurrent(o -> o.withConcurrentReconciliationThreads(1));
ExecutorServiceManager.reset();
eventProcessor.start();

eventProcessor.handleEvent(prepareCREvent());
eventProcessor.handleEvent(prepareCREvent());
eventProcessor.stop();

// wait until both event should be handled
Thread.sleep(TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION + 2 * DISPATCHING_DELAY);
verify(reconciliationDispatcherMock, atMostOnce())
.handleExecution(any());
}

private ResourceID eventAlreadyUnderProcessing() {
when(reconciliationDispatcherMock.handleExecution(any()))
.then(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ static void classSetup() {
* equals will fail on the two equal but NOT identical TestCustomResources because equals is not
* implemented on TestCustomResourceSpec or TestCustomResourceStatus
*/
ConfigurationServiceProvider.reset();
ConfigurationServiceProvider.overrideCurrent(overrider -> overrider
.checkingCRDAndValidateLocalModel(false).withResourceCloner(new Cloner() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource;
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec;
import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler;

import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.RECONCILER_SLEEP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class GracefulStopIT {

public static final String TEST_1 = "test1";
public static final String TEST_2 = "test2";

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withConfigurationService(o -> o.withCloseClientOnStop(false))
.withReconciler(new GracefulStopTestReconciler())
.build();

@Test
void stopsGracefullyWIthTimeout() {
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
}

@Test
void stopsGracefullyWithExpiredTimeout() {
testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1);
}

private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) {
var testRes = operator.create(testResource(resourceName));
await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
assertThat(r.getStatus()).isNotNull();
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1);
assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
.getNumberOfExecutions()).isEqualTo(1);
});

testRes.getSpec().setValue(2);
operator.replace(testRes);

await().pollDelay(Duration.ofMillis(50)).untilAsserted(
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
.getNumberOfExecutions()).isEqualTo(2));

operator.getOperator().stop(Duration.ofMillis(stopTimeout));

await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
assertThat(r.getStatus()).isNotNull();
assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration);
});
}

public GracefulStopTestCustomResource testResource(String name) {
GracefulStopTestCustomResource resource =
new GracefulStopTestCustomResource();
resource.setMetadata(
new ObjectMetaBuilder()
.withName(name)
.withNamespace(operator.getNamespace())
.build());
resource.setSpec(new GracefulStopTestCustomResourceSpec());
resource.getSpec().setValue(1);
return resource;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.javaoperatorsdk.operator.sample.gracefulstop;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("gst")
public class GracefulStopTestCustomResource
extends CustomResource<GracefulStopTestCustomResourceSpec, GracefulStopTestCustomResourceStatus>
implements Namespaced {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.sample.gracefulstop;

public class GracefulStopTestCustomResourceSpec {

private int value;

public int getValue() {
return value;
}

public void setValue(int value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.javaoperatorsdk.operator.sample.gracefulstop;

import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;

public class GracefulStopTestCustomResourceStatus extends ObservedGenerationAwareStatus {

}
Loading