diff --git a/spring-boot-project/spring-boot-autoconfigure/build.gradle b/spring-boot-project/spring-boot-autoconfigure/build.gradle index 6d062d9ce6c2..bf59b82c699b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/build.gradle +++ b/spring-boot-project/spring-boot-autoconfigure/build.gradle @@ -232,6 +232,8 @@ dependencies { optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6") optional("redis.clients:jedis") + compileOnly("org.crac:crac") + testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support")) testImplementation(project(":spring-boot-project:spring-boot-test")) testImplementation("ch.qos.logback:logback-classic") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java index 7d0f78be8cf4..898b4985a113 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java @@ -56,6 +56,7 @@ import org.springframework.boot.ssl.SslBundles; import org.springframework.boot.ssl.SslOptions; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Scope; import org.springframework.core.io.Resource; @@ -80,12 +81,14 @@ @AutoConfiguration @ConditionalOnClass(CqlSession.class) @EnableConfigurationProperties(CassandraProperties.class) +@Import({ CassandraCheckpointRestoreConfiguration.class }) public class CassandraAutoConfiguration { private static final Config SPRING_BOOT_DEFAULTS; static { CassandraDriverOptions options = new CassandraDriverOptions(); options.add(DefaultDriverOption.CONTACT_POINTS, Collections.singletonList("127.0.0.1:9042")); + options.add(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, "default-datacenter"); options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, "none"); options.add(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, (int) Duration.ofSeconds(5).toMillis()); SPRING_BOOT_DEFAULTS = options.build(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraCheckpointRestoreConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraCheckpointRestoreConfiguration.java new file mode 100644 index 000000000000..3a6a68af7f16 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraCheckpointRestoreConfiguration.java @@ -0,0 +1,36 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnCheckpointRestore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration(proxyBeanMethods = false) +@ConditionalOnCheckpointRestore +public class CassandraCheckpointRestoreConfiguration { + + @Bean + @ConditionalOnMissingBean + CassandraSessionLifecycle cassandraSessionLifecycle(CqlSession session) { + return new CassandraSessionLifecycle(session); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraSessionLifecycle.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraSessionLifecycle.java new file mode 100644 index 000000000000..6a064ea19a58 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraSessionLifecycle.java @@ -0,0 +1,135 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.cassandra; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.metadata.DefaultNode; +import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent; +import com.datastax.oss.driver.internal.core.session.DefaultSession; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; +import org.crac.Context; +import org.crac.Core; +import org.crac.Resource; + +import org.springframework.context.Lifecycle; + +public class CassandraSessionLifecycle implements Lifecycle { + + private final DefaultSession session; + + private final InternalDriverContext context; + + // We don't require auto startup through SmartLifecycle + private boolean running = true; + + private CracResource resource = new CracResource(); + + private Map lastState; + + public CassandraSessionLifecycle(CqlSession session) { + this.session = (DefaultSession) session; + this.context = (InternalDriverContext) session.getContext(); + // The resource is registered to support automatic checkpoint on refresh + // before DefaultLifecycleProcessor is started. + Core.getGlobalContext().register(this.resource); + } + + @Override + public synchronized void start() { + if (this.running) { + return; + } + this.running = true; + // After start we can remove the resource and let it be GCed; + // this will be handled by Lifecycle invocations. + this.resource = null; + if (this.lastState == null) { + return; + } + for (var e : this.lastState.entrySet()) { + NodeStateEvent changed = NodeStateEvent.changed(NodeState.FORCED_DOWN, e.getValue(), + (DefaultNode) e.getKey()); + this.context.getEventBus().fire(changed); + } + this.lastState = null; + this.context.getControlConnection().reconnectNow(); + } + + @Override + public synchronized void stop() { + if (!this.running) { + return; + } + this.running = false; + + try { + // ControlConnection would try to reconnect when it receives the event that + // node was brought down; seeing the channel to this node already closed + // prevents that. + this.context.getControlConnection().channel().close().get(); + + this.lastState = new HashMap<>(); + ArrayList> closeFutures = new ArrayList<>(); + for (var e : this.session.getPools().entrySet()) { + Node node = e.getKey(); + NodeState currentState = node.getState(); + this.lastState.put(node, currentState); + closeFutures.add(e.getValue().closeFuture()); + this.context.getEventBus() + .fire(NodeStateEvent.changed(currentState, NodeState.FORCED_DOWN, (DefaultNode) node)); + } + + CompletableFutures.allDone(closeFutures).toCompletableFuture().get(); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + private final class CracResource implements org.crac.Resource { + + @Override + public void beforeCheckpoint(Context context) { + stop(); + } + + @Override + public void afterRestore(Context context) { + start(); + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java index 4a276d96bd10..c53fb8f7e4a4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java @@ -38,6 +38,7 @@ import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration; import org.springframework.boot.ssl.NoSuchSslBundleException; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.boot.testsupport.classpath.ClassPathOverrides; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -408,6 +409,17 @@ void driverConfigLoaderWithConfigCreateProfiles() { }); } + @Test + @ClassPathOverrides("org.crac:crac:1.3.0") + void whenCheckpointRestoreIsAvailableCassandraAutoConfigRegistersLifecycleBean() { + this.contextRunner.run((context) -> assertThat(context).hasSingleBean(CassandraSessionLifecycle.class)); + } + + @Test + void whenCheckpointRestoreIsNotAvailableCassandraAutoConfigDoesNotRegisterLifecycleBean() { + this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(CassandraSessionLifecycle.class)); + } + private CassandraConnectionDetails cassandraConnectionDetails() { return new CassandraConnectionDetails() {