Skip to content

Commit b7466e3

Browse files
committed
Add Lifecycle/CRaC support for Cassandra
Signed-off-by: Radim Vansa <[email protected]>
1 parent d04620d commit b7466e3

File tree

5 files changed

+188
-0
lines changed

5 files changed

+188
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ dependencies {
232232
optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6")
233233
optional("redis.clients:jedis")
234234

235+
compileOnly("org.crac:crac")
236+
235237
testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
236238
testImplementation(project(":spring-boot-project:spring-boot-test"))
237239
testImplementation("ch.qos.logback:logback-classic")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.boot.ssl.SslBundles;
5757
import org.springframework.boot.ssl.SslOptions;
5858
import org.springframework.context.annotation.Bean;
59+
import org.springframework.context.annotation.Import;
5960
import org.springframework.context.annotation.Lazy;
6061
import org.springframework.context.annotation.Scope;
6162
import org.springframework.core.io.Resource;
@@ -80,12 +81,14 @@
8081
@AutoConfiguration
8182
@ConditionalOnClass(CqlSession.class)
8283
@EnableConfigurationProperties(CassandraProperties.class)
84+
@Import({ CassandraCheckpointRestoreConfiguration.class })
8385
public class CassandraAutoConfiguration {
8486

8587
private static final Config SPRING_BOOT_DEFAULTS;
8688
static {
8789
CassandraDriverOptions options = new CassandraDriverOptions();
8890
options.add(DefaultDriverOption.CONTACT_POINTS, Collections.singletonList("127.0.0.1:9042"));
91+
options.add(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, "default-datacenter");
8992
options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, "none");
9093
options.add(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, (int) Duration.ofSeconds(5).toMillis());
9194
SPRING_BOOT_DEFAULTS = options.build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.cassandra;
18+
19+
import com.datastax.oss.driver.api.core.CqlSession;
20+
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnCheckpointRestore;
22+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
26+
@Configuration(proxyBeanMethods = false)
27+
@ConditionalOnCheckpointRestore
28+
public class CassandraCheckpointRestoreConfiguration {
29+
30+
@Bean
31+
@ConditionalOnMissingBean
32+
CassandraSessionLifecycle cassandraSessionLifecycle(CqlSession session) {
33+
return new CassandraSessionLifecycle(session);
34+
}
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.cassandra;
18+
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletionStage;
23+
import java.util.concurrent.ExecutionException;
24+
25+
import com.datastax.oss.driver.api.core.CqlSession;
26+
import com.datastax.oss.driver.api.core.metadata.Node;
27+
import com.datastax.oss.driver.api.core.metadata.NodeState;
28+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
29+
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
30+
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
31+
import com.datastax.oss.driver.internal.core.session.DefaultSession;
32+
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
33+
import org.crac.Context;
34+
import org.crac.Core;
35+
import org.crac.Resource;
36+
37+
import org.springframework.context.Lifecycle;
38+
39+
public class CassandraSessionLifecycle implements Lifecycle {
40+
41+
private final DefaultSession session;
42+
43+
private final InternalDriverContext context;
44+
45+
// We don't require auto startup through SmartLifecycle
46+
private boolean running = true;
47+
48+
private CracResource resource = new CracResource();
49+
50+
private Map<Node, NodeState> lastState;
51+
52+
public CassandraSessionLifecycle(CqlSession session) {
53+
this.session = (DefaultSession) session;
54+
this.context = (InternalDriverContext) session.getContext();
55+
// The resource is registered to support automatic checkpoint on refresh
56+
// before DefaultLifecycleProcessor is started.
57+
Core.getGlobalContext().register(this.resource);
58+
}
59+
60+
@Override
61+
public synchronized void start() {
62+
if (this.running) {
63+
return;
64+
}
65+
this.running = true;
66+
// After start we can remove the resource and let it be GCed;
67+
// this will be handled by Lifecycle invocations.
68+
this.resource = null;
69+
if (this.lastState == null) {
70+
return;
71+
}
72+
for (var e : this.lastState.entrySet()) {
73+
NodeStateEvent changed = NodeStateEvent.changed(NodeState.FORCED_DOWN, e.getValue(),
74+
(DefaultNode) e.getKey());
75+
this.context.getEventBus().fire(changed);
76+
}
77+
this.lastState = null;
78+
this.context.getControlConnection().reconnectNow();
79+
}
80+
81+
@Override
82+
public synchronized void stop() {
83+
if (!this.running) {
84+
return;
85+
}
86+
this.running = false;
87+
88+
try {
89+
// ControlConnection would try to reconnect when it receives the event that
90+
// node was brought down; seeing the channel to this node already closed
91+
// prevents that.
92+
this.context.getControlConnection().channel().close().get();
93+
94+
this.lastState = new HashMap<>();
95+
ArrayList<CompletionStage<Void>> closeFutures = new ArrayList<>();
96+
for (var e : this.session.getPools().entrySet()) {
97+
Node node = e.getKey();
98+
NodeState currentState = node.getState();
99+
this.lastState.put(node, currentState);
100+
closeFutures.add(e.getValue().closeFuture());
101+
this.context.getEventBus()
102+
.fire(NodeStateEvent.changed(currentState, NodeState.FORCED_DOWN, (DefaultNode) node));
103+
}
104+
105+
CompletableFutures.allDone(closeFutures).toCompletableFuture().get();
106+
}
107+
catch (InterruptedException ex) {
108+
Thread.currentThread().interrupt();
109+
throw new RuntimeException(ex);
110+
}
111+
catch (ExecutionException ex) {
112+
throw new RuntimeException(ex);
113+
}
114+
}
115+
116+
@Override
117+
public boolean isRunning() {
118+
return this.running;
119+
}
120+
121+
private final class CracResource implements org.crac.Resource {
122+
123+
@Override
124+
public void beforeCheckpoint(Context<? extends Resource> context) {
125+
stop();
126+
}
127+
128+
@Override
129+
public void afterRestore(Context<? extends Resource> context) {
130+
start();
131+
}
132+
133+
}
134+
135+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java

+12
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration;
3939
import org.springframework.boot.ssl.NoSuchSslBundleException;
4040
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
41+
import org.springframework.boot.testsupport.classpath.ClassPathOverrides;
4142
import org.springframework.context.annotation.Bean;
4243
import org.springframework.context.annotation.Configuration;
4344

@@ -408,6 +409,17 @@ void driverConfigLoaderWithConfigCreateProfiles() {
408409
});
409410
}
410411

412+
@Test
413+
@ClassPathOverrides("org.crac:crac:1.3.0")
414+
void whenCheckpointRestoreIsAvailableCassandraAutoConfigRegistersLifecycleBean() {
415+
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(CassandraSessionLifecycle.class));
416+
}
417+
418+
@Test
419+
void whenCheckpointRestoreIsNotAvailableCassandraAutoConfigDoesNotRegisterLifecycleBean() {
420+
this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(CassandraSessionLifecycle.class));
421+
}
422+
411423
private CassandraConnectionDetails cassandraConnectionDetails() {
412424
return new CassandraConnectionDetails() {
413425

0 commit comments

Comments
 (0)