From b61d340451b87272d48cabf458153b874fcc14db Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:40:47 +0530 Subject: [PATCH 01/14] feat: Introduce `MutationCache` interface, add `PersistentCache` implementation, and refactor `InMemoryCache` to implement interface. (cherry picked from commit 14e284a34859c3cd268ac4b94bb5088a6d732183) --- .../InMemoryCache.java} | 27 +--- .../datastax/oss/cdc/cache/MutationCache.java | 41 ++++++ .../oss/cdc/cache/PersistentCache.java | 124 ++++++++++++++++++ 3 files changed, 169 insertions(+), 23 deletions(-) rename connector/src/main/java/com/datastax/oss/cdc/{MutationCache.java => cache/InMemoryCache.java} (67%) create mode 100644 connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java create mode 100644 connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java diff --git a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java similarity index 67% rename from connector/src/main/java/com/datastax/oss/cdc/MutationCache.java rename to connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java index ff5d4a8c..92fca06b 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java @@ -1,19 +1,4 @@ -/** - * Copyright DataStax, Inc 2021. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.cdc; +package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -24,19 +9,15 @@ import java.util.List; import java.util.concurrent.TimeUnit; -/** - * Keep MD5 digests to deduplicate Cassandra mutations - */ -public class MutationCache { - +public class InMemoryCache implements MutationCache { Cache> mutationCache; /** - * Max number of cached digest per cached entry. + * Max number of cached digests per cached entry. */ long maxDigests; - public MutationCache(long maxDigests, long maxCapacity, Duration expireAfter) { + public InMemoryCache(long maxDigests, long maxCapacity, Duration expireAfter) { this.maxDigests = maxDigests; mutationCache = Caffeine.newBuilder() .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java new file mode 100644 index 00000000..63522717 --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java @@ -0,0 +1,41 @@ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; + +import java.util.List; + +public interface MutationCache { + /** + * Add a mutation MD5 digest to the cache for the given mutation key. + * @param mutationKey the key for the mutation, typically a partition key or a unique identifier + * @param md5Digest the MD5 digest of the mutation to be added + * @return a list of MD5 digests for the given mutation key, which may include the newly added digest + */ + List addMutationMd5(K mutationKey, String md5Digest); + + /** + * Retrieve the list of MD5 digests for the given mutation key. + * @param mutationKey the key for the mutation + * @return a list of MD5 digests associated with the mutation key, or an empty list if none exist + */ + List getMutationCRCs(K mutationKey); + + /** + * Check if a mutation with the given key and MD5 digest has already been processed. + * @param mutationKey the key for the mutation + * @param md5Digest the MD5 digest of the mutation + * @return true if the mutation has been processed, false otherwise + */ + boolean isMutationProcessed(K mutationKey, String md5Digest); + + /** + * Gives the current statistics of the cache, such as hit rate, miss rate, and size. + * Caffeine Statistics wiki + */ + CacheStats stats(); + + /** + * Gives the estimated size of the cache. + */ + long estimatedSize(); +} diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java new file mode 100644 index 00000000..8041988e --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -0,0 +1,124 @@ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.TtlDB; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class PersistentCache implements MutationCache { + /** + * The mutation cache + */ + Cache> mutationCache; + + private final TtlDB rocksDB; + /** + * Max number of cached digests per cached entry. + */ + long maxDigests; + + private final Function keySerializer; + + static { + TtlDB.loadLibrary(); + } + + public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, String dbPath, Function keySerializer) throws RocksDBException { + this.maxDigests = maxDigests; + this.keySerializer = keySerializer; + + Options options = new Options().setCreateIfMissing(true); + this.rocksDB = TtlDB.open(options, dbPath, (int) expireAfter.getSeconds(), false); + + this.mutationCache = Caffeine.newBuilder() + .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) + .maximumSize(maxCapacity) + .executor(Runnable::run) + .recordStats() + .removalListener((K key, List value, RemovalCause cause) -> { + try { + rocksDB.delete(this.keySerializer.apply(key)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }) + .build(); + } + + + private List valueDeserializer(byte[] data){ + return data == null || data.length == 0 ? null : List.of(new String(data).split(",")); + } + + private byte[] valueSerializer(List data){ + return data.stream() + .reduce((s1, s2) -> s1 + "," + s2) + .orElse("") + .getBytes(); + } + + public List getMutationCRCs(K mutationKey) { + return mutationCache.get(mutationKey, k -> { + try { + return valueDeserializer(rocksDB.get(this.keySerializer.apply(k))); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }); + } + + public void putMutationCRCs(K key, List value) { + mutationCache.asMap().compute(key, (k, v) -> { + try { + rocksDB.put(keySerializer.apply(k), valueSerializer(value)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + return value; + }); + } + + public List addMutationMd5(K mutationKey, String md5Digest) { + List crcs = getMutationCRCs(mutationKey); + if(crcs == null) { + crcs = new ArrayList<>(1); + crcs.add(md5Digest); + } else { + if (!crcs.contains(md5Digest)) { + if (crcs.size() >= maxDigests) { + // remove the oldest digest + crcs.remove(0); + } + crcs.add(md5Digest); + } + } + putMutationCRCs(mutationKey, crcs); + return crcs; + } + + public boolean isMutationProcessed(K mutationKey, String md5Digest) { + List digests = getMutationCRCs(mutationKey); + return digests != null && digests.contains(md5Digest); + } + + public CacheStats stats() { + return mutationCache.stats(); + } + + public long estimatedSize() { + return mutationCache.estimatedSize(); + } + + public void close() { + rocksDB.close(); + } +} From 10334bc85a4b5286c5a4dca8e68bfca4f1cb5868 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:05 +0530 Subject: [PATCH 02/14] feat: Add RocksDB dependency to project configurations (cherry picked from commit 3fb06b5a4ee18f36ab0f9778c2f8b90e7be95656) --- connector/build.gradle | 1 + gradle.properties | 1 + 2 files changed, 2 insertions(+) diff --git a/connector/build.gradle b/connector/build.gradle index 3b57fe8a..85e60689 100644 --- a/connector/build.gradle +++ b/connector/build.gradle @@ -32,6 +32,7 @@ sourceSets { dependencies { implementation project(':commons') implementation("com.github.ben-manes.caffeine:caffeine:${caffeineVersion}") + implementation("org.rocksdb:rocksdbjni:${rocksdbVersion}") implementation("io.vavr:vavr:${vavrVersion}") implementation "com.datastax.oss:java-driver-core:${ossDriverVersion}" implementation "com.datastax.oss:java-driver-query-builder:${ossDriverVersion}" diff --git a/gradle.properties b/gradle.properties index 55098650..b9cae411 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,6 +24,7 @@ kafkaVersion=3.4.0 vavrVersion=0.10.3 testContainersVersion=1.19.1 caffeineVersion=2.8.8 +rocksdbVersion=9.4.0 guavaVersion=30.1-jre messagingConnectorsCommonsVersion=1.0.14 slf4jVersion=1.7.30 From 1f9dae55d9c484dd8a85bdc325d0104bddd8be86 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:25 +0530 Subject: [PATCH 03/14] feat: Add persistent cache directory configuration to Cassandra source connector (cherry picked from commit 6a61963db98c97df31bb4f793d96e9ec945567a7) --- .../oss/cdc/CassandraSourceConnectorConfig.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java index f04d5ffe..6dad70e5 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java +++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java @@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig { public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest"; public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity"; public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms"; + public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory"; + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; @@ -195,6 +197,13 @@ public class CassandraSourceConnectorConfig { ConfigDef.Importance.HIGH, "The maximum number of digest per mutation cache entry, with a default set to 3", "CQL Read cache", 1, ConfigDef.Width.NONE, "CacheMaxDigest") + .define(CACHE_PERSISTENT_DIRECTORY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + "The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` " + + "If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache.", + "CQL Read cache", 1, ConfigDef.Width.NONE, "CachePersistentDirectory") .define(CACHE_MAX_CAPACITY_CONFIG, ConfigDef.Type.LONG, "32767", @@ -661,6 +670,10 @@ public long getCacheMaxDigests() { return globalConfig.getLong(CACHE_MAX_DIGESTS_CONFIG); } + public String getCachePersistentDirectory() { + return globalConfig.getString(CACHE_PERSISTENT_DIRECTORY_CONFIG); + } + public long getCacheMaxCapacity() { return globalConfig.getLong(CACHE_MAX_CAPACITY_CONFIG); } @@ -782,6 +795,7 @@ public String toString() { + " " + QUERY_BACKOFF_IN_MS_CONFIG + ": %d%n" + " " + QUERY_MAX_BACKOFF_IN_SEC_CONFIG + ": %d%n" + " " + CACHE_MAX_DIGESTS_CONFIG + ": %d%n" + + " " + CACHE_PERSISTENT_DIRECTORY_CONFIG + ": %s%n" + " " + CACHE_MAX_CAPACITY_CONFIG + ": %d%n" + " " + CACHE_EXPIRE_AFTER_MS_CONFIG + ": %d%n" + " " + CACHE_ONLY_IF_COORDINATOR_MATCH + ": %s%n" @@ -805,6 +819,7 @@ public String toString() { getQueryBackoffInMs(), getQueryMaxBackoffInSec(), getCacheMaxDigests(), + getCachePersistentDirectory(), getCacheMaxCapacity(), getCacheExpireAfterMs(), getCacheOnlyIfCoordinatorMatch(), From e4563bbe86974389f18db5559be5b2f8df12e3f1 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:41:45 +0530 Subject: [PATCH 04/14] feat: Refactor mutation cache initialization to support persistent caching (cherry picked from commit 75d91f7be46cefcd9ad7840762541b72b5c54b0c) --- .../oss/pulsar/source/CassandraSource.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 21716a59..d32c49cd 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -20,9 +20,11 @@ import com.datastax.oss.cdc.ConfigUtil; import com.datastax.oss.cdc.Constants; import com.datastax.oss.cdc.CqlLogicalTypes; -import com.datastax.oss.cdc.MutationCache; import com.datastax.oss.cdc.MutationValue; import com.datastax.oss.cdc.Version; +import com.datastax.oss.cdc.cache.MutationCache; +import com.datastax.oss.cdc.cache.InMemoryCache; +import com.datastax.oss.cdc.cache.PersistentCache; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Row; @@ -66,6 +68,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import org.rocksdb.RocksDBException; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; @@ -312,10 +315,8 @@ public void open(Map config, SourceContext sourceContext) { consumerBuilder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); } this.consumer = consumerBuilder.subscribe(); - this.mutationCache = new MutationCache<>( - this.config.getCacheMaxDigests(), - this.config.getCacheMaxCapacity(), - Duration.ofMillis(this.config.getCacheExpireAfterMs())); + this.mutationCache = this.getMutationCache(); + log.info("Starting source connector topic={} subscription={} query.executors={}", dirtyTopicName, this.config.getEventsSubscriptionName(), @@ -326,6 +327,30 @@ public void open(Map config, SourceContext sourceContext) { } } + /** + * Get the mutation cache implementation. + * @return the mutation cache implementation + * @throws RocksDBException if the mutation cache cannot be created due to RocksDB issues + */ + private MutationCache getMutationCache() throws RocksDBException { + if(this.config.getCachePersistentDirectory() != null){ + return new PersistentCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs()), + this.config.getCachePersistentDirectory() + "/" + + this.sourceContext.getSourceName() + "-" + this.sourceContext.getInstanceId(), + String::getBytes + ); + } + else { + return new InMemoryCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs())); + } + } + void maybeInitCassandraClient() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException { if (this.cassandraClient == null) { synchronized (this) { From 758630b14cd6c8fda653d7a88adf67cb0e41ad1e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:42:19 +0530 Subject: [PATCH 05/14] feat: Update MutationCacheTests to use InMemoryCache implementation (cherry picked from commit b1a617f39395e1f171f8e4385bc57c7d49304ec4) --- .../datastax/oss/cdc/MutationCacheTests.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java index e473875f..a0868067 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java @@ -15,17 +15,22 @@ */ package com.datastax.oss.cdc; +import com.datastax.oss.cdc.cache.InMemoryCache; +import com.datastax.oss.cdc.cache.MutationCache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; import java.time.Duration; +import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; public class MutationCacheTests { @Test public final void testMaxDigests() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); mutationCache.addMutationMd5("mutation1","digest1"); mutationCache.addMutationMd5("mutation1","digest2"); mutationCache.addMutationMd5("mutation1","digest3"); @@ -35,20 +40,20 @@ public final void testMaxDigests() throws Exception { @Test public final void testIsProcessed() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); - assertEquals(false,mutationCache.isMutationProcessed("mutation1","digest1")); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); } @Test public final void testExpireAfter() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofSeconds(1)); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofSeconds(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); Thread.sleep(2000); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); } } From ed979b8a53c9d5b7b9dbb76d1bc9b589fee2f97f Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:43:10 +0530 Subject: [PATCH 06/14] feat: Add test for max capacity in InMemoryCache implementation (cherry picked from commit dc72d73e35000608db5f49ea3f3dc6464e49bb18) --- .../datastax/oss/cdc/MutationCacheTests.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java index a0868067..085fbd28 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java @@ -56,4 +56,32 @@ public final void testExpireAfter() throws Exception { assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); } + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + + // Access and modify the private field using reflection + Field field = InMemoryCache.class.getDeclaredField("mutationCache"); + field.setAccessible(true); + field.set(mutationCache, Caffeine.newBuilder() + .expireAfterWrite(Duration.ofHours(1).getSeconds(), TimeUnit.SECONDS) + .maximumSize(10) + .recordStats() + .executor(Runnable::run) // https://github.com/ben-manes/caffeine/wiki/Testing + .build() + ); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + int count = 0; + for (int i = 0; i < 20; i++) { + if(mutationCache.getMutationCRCs("mutation" + i) != null) { + count++; + } + } + assertEquals(10, count); + } + } From 01834cf937c396f3a6a63f03a0edbcb136aa7611 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 11:43:31 +0530 Subject: [PATCH 07/14] docs: persistent cache directory setting in Cassandra source connector (cherry picked from commit 1d1a1f7403509a92a15ddffc7707e1f8c8774f22) --- .../core/modules/ROOT/partials/cfgCassandraSource.adoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc index 2a29bee3..b9645390 100644 --- a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc +++ b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc @@ -187,4 +187,10 @@ | | true +| *cache.persistent.directory* +| The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` This setting is only used if the cache is persistent. If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache. +| string +| +| /data/source-cache + |=== From 73d751096201c3af5716fc004987f90eaa747b93 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:23 +0530 Subject: [PATCH 08/14] feat: Update the mutation cache to conditionally delete keys from RocksDB (cherry picked from commit 618c993ae63bc7b1e1410d54410cbfd0b66b0109) --- .../com/datastax/oss/cdc/cache/PersistentCache.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java index 8041988e..42ec0b34 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -4,6 +4,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; import org.rocksdb.Options; import org.rocksdb.RocksDBException; import org.rocksdb.TtlDB; @@ -18,7 +19,7 @@ public class PersistentCache implements MutationCache { /** * The mutation cache */ - Cache> mutationCache; + private final Cache> mutationCache; private final TtlDB rocksDB; /** @@ -42,11 +43,14 @@ public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, this.mutationCache = Caffeine.newBuilder() .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) .maximumSize(maxCapacity) - .executor(Runnable::run) .recordStats() .removalListener((K key, List value, RemovalCause cause) -> { try { - rocksDB.delete(this.keySerializer.apply(key)); + // If the removal cause is not SIZE, we delete the key from RocksDB + // This is to avoid deleting the key when it is removed due to the size limit + if (cause != RemovalCause.SIZE ) { + rocksDB.delete(this.keySerializer.apply(key)); + } } catch (RocksDBException e) { throw new RuntimeException(e); } From 1422bcd1bcd037a82c00bb125cca0ea067dff31e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:32 +0530 Subject: [PATCH 09/14] refactor: Move MutationCacheTests to the cache package (cherry picked from commit e3d7973458b4d1bd74af222c9b9675d25a16bd82) --- .../com/datastax/oss/cdc/{ => cache}/MutationCacheTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename connector/src/test/java/com/datastax/oss/cdc/{ => cache}/MutationCacheTests.java (96%) diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java similarity index 96% rename from connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java rename to connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java index 085fbd28..9fa1918c 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java @@ -13,10 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datastax.oss.cdc; +package com.datastax.oss.cdc.cache; -import com.datastax.oss.cdc.cache.InMemoryCache; -import com.datastax.oss.cdc.cache.MutationCache; import com.github.benmanes.caffeine.cache.Caffeine; import org.junit.jupiter.api.Test; From 177a363664d8f4be167b756dc20b7790ddee06c9 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Thu, 26 Jun 2025 12:38:34 +0530 Subject: [PATCH 10/14] refactor: Move MutationCacheTests to the cache package (cherry picked from commit 9788b47c93f1867107850036651e77faff45ab2b) --- .../oss/cdc/cache/PersistentCacheTests.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java new file mode 100644 index 00000000..5e70792d --- /dev/null +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java @@ -0,0 +1,95 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.common.testing.FakeTicker; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.RocksDBException; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +public class PersistentCacheTests { + private String path; + + @BeforeEach + public void setUp() { + path = "./rocksdb_mutation_cache_" + (new Random()).nextInt(); + } + + @AfterEach + public void tearDown() { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (Exception e) { + // Ignore any exceptions during cleanup + } + } + + @Test + public final void testMaxDigests() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + mutationCache.addMutationMd5("mutation1","digest4"); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testIsProcessed() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + mutationCache.addMutationMd5("mutation1","digest1"); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); + + } + + @Test + public final void testPersistence() throws Exception { + PersistentCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + + mutationCache.close(); + mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + for (int i = 0; i < 20; i++) { + assertEquals(List.of("digest" + i), mutationCache.getMutationCRCs("mutation" + i)); + } + } +} From aad0eb496516c0372f14750e3ba4b23396b73632 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Fri, 27 Jun 2025 10:01:00 +0530 Subject: [PATCH 11/14] chore: Add Apache License header to cache-related Java files (cherry picked from commit b86be914ebfa5a1affd5c2709fa11875dbf6afbf) --- .../datastax/oss/cdc/cache/InMemoryCache.java | 15 +++++++++++++++ .../datastax/oss/cdc/cache/MutationCache.java | 15 +++++++++++++++ .../datastax/oss/cdc/cache/PersistentCache.java | 16 +++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java index 92fca06b..b7c74006 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java index 63522717..0d83079f 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.stats.CacheStats; diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java index 42ec0b34..544b6845 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -1,10 +1,24 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; -import com.google.common.annotations.VisibleForTesting; import org.rocksdb.Options; import org.rocksdb.RocksDBException; import org.rocksdb.TtlDB; From e86d5d0a33a2bb058cf166d1efb18a0dde83d688 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 09:56:33 +0530 Subject: [PATCH 12/14] ci: extend test job timeout to 360 minutes (cherry picked from commit 4a94cdbe20326d2488f00087185e7cd5b780cf90) --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 48b95fca..af4fa504 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: needs: build name: Test runs-on: ubuntu-latest - timeout-minutes: 120 + timeout-minutes: 360 strategy: fail-fast: false matrix: From e240df653408a308b943d63cc2a9fde14efacb2e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 11:10:43 +0530 Subject: [PATCH 13/14] test: Fix PersistentCacheTests to resolve assertion method compatibility (cherry picked from commit 2387798f0edb977a2d7ab8df5c88109ecb618d38) --- .../com/datastax/oss/cdc/cache/PersistentCacheTests.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java index 5e70792d..fbb7f1c5 100644 --- a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java @@ -15,20 +15,15 @@ */ package com.datastax.oss.cdc.cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.google.common.testing.FakeTicker; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.rocksdb.RocksDBException; import java.io.File; import java.time.Duration; import java.util.List; import java.util.Random; -import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.*; @@ -89,7 +84,9 @@ public final void testMaxCapacity() throws Exception { } for (int i = 0; i < 20; i++) { - assertEquals(List.of("digest" + i), mutationCache.getMutationCRCs("mutation" + i)); + List expected = new java.util.ArrayList<>(); + expected.add("digest" + i); + assertEquals(expected, mutationCache.getMutationCRCs("mutation" + i)); } } } From d0f550baea69692e9a173e79eeae40ea473df754 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Mon, 28 Jul 2025 18:07:56 +0530 Subject: [PATCH 14/14] refactor(ChaosNetworkContainer): update tc-image to use ghcr.io/alexei-led/pumba-debian-nettools (cherry picked from commit 354338a6a7e39a0d2b4d06ea239932e5034927ca) --- .../java/com/datastax/testcontainers/ChaosNetworkContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java index fda04663..a08c7249 100644 --- a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java +++ b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java @@ -34,7 +34,7 @@ public class ChaosNetworkContainer> ext public ChaosNetworkContainer(String targetContainer, String pause) { super(PUMBA_IMAGE); - setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer); + setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer); addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE); setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1)); withLogConsumer(o -> {