Skip to content

Commit f46bb53

Browse files
Programmatix datacouch 1145 transaction support (#1446)
* Porting to SDK-integrated version of transactions The transactions logic exists in the Java SDK as of 3.3.0, with a slightly different API. This is the first effort at the port, which literally just compiles. It will not run as crucial code has been commented and todo-ed. There is work remaining to figure out how to complete the port, as some crucial parts (such as ctx.commit() and ctx.rollback()) have been intentionally removed. * Continuing work to get the ExtSDKIntegration port working Trying to transition to CallbackPreferring manager. * Added CouchbaseSimpleCallbackTransactionManager, the simplest possible implementation of CallbackPreferringTransactionManager, combined with a simpler approach to ThreadLocal storage in ReactiveInsertByIdSupport. Test 'commitShouldPersistTxEntriesOfTxAnnotatedMethod' is now passing. * Adding WIP get-and-replace @transactional support (Not yet working as CAS/version field in Person is not populated correctly.) * Transitioning to use CoreTransactionAttemptContext. Tests may fail. * Removing AttemptContextReactiveAccessor Don't think we need this, as we can pass around CoreTransactionAttemptContext instead, which gives access to a lot of internals. * Removing TransactionsReactive Would prefer not to C&P a huge class out of the transaction internals, and don't think we need it. * Removing some files not currently used To reduce & simplify the amount of code to look at. Some don't seem to be used in any branch, some just aren't used in this branch. * Removing CouchbaseTransactionInterceptor As per offline discussion, CallbackPreferringPlatformTransactionManager is perhaps the optimal solution. * Copying @transactional tests out into separate class * Tidyup * Tidyup test names * Verify GenericSupport is on same thread before and after transactional operation * Refactoring CouchbaseSimpleCallbackTransactionManager ThreadLocalStorage management * Using latest java-client * ReactiveReplaceByIdSupport - Fixing use of CAS now have CoreTransactionAttemptContext. Removing unused code. * ReactiveInsertByIdSupport - fixing use of reactive vs non-reactive, and CAS * Merging upstream * Remove incorrect thread check (.doOnNext could execute on a different thread) * Completing merge from upstream * Removing unused classes * Give GenericSupport a better name * Reject at runtime options that aren't supported in a transaction * Fixing some small todos, partly by removing unused coe * Fix runtime option checks * Simplifying CouchbaseSimpleCallbackTransactionManager ThreadLocalStorage Standardising on ReactiveCouchbaseResourceHolder rather than holding CoreTransactionAttemptContext too * Removing version from CouchbaseDocument Can't recall why I added this, and tests pass without it * Improving CouchbaseTransactionalIntegrationTests and adding more tests * Reject operations that aren't allowed in a transaction (upsertById etc.) * Improve handling of CAS mismatch By calling CoreTransactionAttemptContext.operationFailed, it ensures that internal state is set. So even if the user catches the exception, the transaction still behaves as it should. * Removing a now-redundant non-transactional check on upsertById I missed this when adding TransactionalSupport.verifyNotInTransaction here. * Support @transactional options timeout and isolation level Co-authored-by: Graham Pople <[email protected]>
1 parent 9e7fdde commit f46bb53

File tree

77 files changed

+1864
-1782
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1864
-1782
lines changed

src/main/java/com/couchbase/client/java/Cluster.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
* The SDK will only work against Couchbase Server 5.0 and later, because RBAC (role-based access control) is a first
104104
* class concept since 3.0 and therefore required.
105105
*/
106-
// todo gp is this required?
106+
// todo gpx as per discussion with miker - if required, ClusterInterface will be added to the SDK instead
107107
public class Cluster implements ClusterInterface {
108108

109109
/**
@@ -256,9 +256,9 @@ public static Cluster connect(final String connectionString, final ClusterOption
256256
final ClusterOptions.Built opts = options.build();
257257
final Supplier<ClusterEnvironment> environmentSupplier = extractClusterEnvironment(connectionString, opts);
258258
return new Cluster(
259-
environmentSupplier,
260-
opts.authenticator(),
261-
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
259+
environmentSupplier,
260+
opts.authenticator(),
261+
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
262262
);
263263
}
264264

src/main/java/com/couchbase/client/java/ClusterInterface.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public interface ClusterInterface {
8383

8484
//AnalyticsResult analyticsQuery(String statement);
8585

86-
// AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);
86+
// AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);
8787

8888
SearchResult searchQuery(String indexName, SearchQuery query);
8989

src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,6 @@
4444
*/
4545
public class AttemptContextReactiveAccessor {
4646

47-
public static ReactiveTransactionAttemptContext getACR(TransactionAttemptContext attemptContext) {
48-
// return attemptContext.ctx();
49-
// todo gp is this access needed. Could hold the raw CoreTransactionAttemptContext instead.
50-
return null;
51-
}
52-
5347
public static ReactiveTransactions reactive(Transactions transactions) {
5448
try {
5549
Field field = Transactions.class.getDeclaredField("reactive");
@@ -212,26 +206,18 @@ public static TransactionResult run(Transactions transactions, Consumer<Transact
212206
return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
213207
}
214208

215-
// todo gp if needed let's expose in the SDK
216-
// static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
217-
// StringBuilder sb = new StringBuilder();
218-
// sb.append("library version: ");
219-
// sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
220-
// sb.append(" config: ");
221-
// sb.append("atrs=");
222-
// sb.append(config.numAtrs());
223-
// sb.append(", metadataCollection=");
224-
// sb.append(config.metadataCollection());
225-
// sb.append(", expiry=");
226-
// sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
227-
// sb.append("msecs durability=");
228-
// sb.append(config.durabilityLevel());
229-
// sb.append(" per-txn config=");
230-
// sb.append(" durability=");
231-
// sb.append(perConfig.durabilityLevel());
232-
// sb.append(", supported=");
233-
// sb.append(Supported.SUPPORTED);
234-
// return sb.toString();
235-
// }
209+
CoreTransactionAttemptContext coreTransactionsReactive;
210+
try {
211+
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
212+
field.setAccessible(true);
213+
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
214+
} catch (Throwable err) {
215+
throw new RuntimeException(err);
216+
}
217+
return coreTransactionsReactive;
218+
}
236219

220+
public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
221+
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
222+
}
237223
}

src/main/java/com/example/demo/CouchbaseTransactionalTemplate.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ public <T> SpringTransactionGetResult<T> findById(String id, Class<T> domainType
2626
CoreTransactionAttemptContext ctx = getContext();
2727
CoreTransactionGetResult getResult = ctx.get( makeCollectionIdentifier(template.getCouchbaseClientFactory().getDefaultCollection().async()) , id).block();
2828

29-
// todo gp getResult.cas() is no longer exposed - required?
30-
T t = template.support().decodeEntity(id, new String(getResult.contentAsBytes()), 0, domainType,
29+
T t = template.support().decodeEntity(id, new String(getResult.contentAsBytes()), getResult.cas(), domainType,
3130
null, null, null);
3231
return new SpringTransactionGetResult<>(t, getResult);
3332
} catch (Exception e) {

src/main/java/com/example/demo/SpringTransactionGetResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public CoreTransactionGetResult getInner() {
2424
@Override
2525
public String toString() {
2626
return "SpringTransactionGetResult{" +
27-
"value=" + value +
28-
", inner=" + inner +
29-
'}';
27+
"value=" + value +
28+
", inner=" + inner +
29+
'}';
3030
}
3131
}

src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public interface CouchbaseClientFactory extends Closeable {
7777
PersistenceExceptionTranslator getExceptionTranslator();
7878

7979
CoreTransactionAttemptContext getCore(TransactionOptions options,
80-
CoreTransactionAttemptContext atr);
80+
CoreTransactionAttemptContext atr);
8181

8282
//CouchbaseClientFactory with(CouchbaseTransactionalOperator txOp);
8383

src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.couchbase.client.java.Scope;
2424
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
2525
import com.couchbase.client.java.transactions.config.TransactionOptions;
26-
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
2726
import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
2827
import org.springframework.data.couchbase.transaction.ReactiveCouchbaseResourceHolder;
2928
import reactor.core.publisher.Mono;
@@ -107,8 +106,7 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*
107106

108107
void close() throws IOException;
109108

110-
ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
111-
CoreTransactionAttemptContext ctx);
109+
ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options, CoreTransactionAttemptContext ctx);
112110

113111
/*
114112
* (non-Javadoc)

src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,21 @@ public class SimpleCouchbaseClientFactory implements CouchbaseClientFactory {
5353
//private JsonSerializer serializer = null;
5454

5555
public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
56-
final String bucketName) {
56+
final String bucketName) {
5757
this(connectionString, authenticator, bucketName, null);
5858
}
5959

6060
public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
61-
final String bucketName, final String scopeName) {
61+
final String bucketName, final String scopeName) {
6262
this(new OwnedSupplier<>(Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator)
63-
// todo gp disabling cleanupLostAttempts to simplify output during development
64-
.environment(env -> env.transactionsConfig(
65-
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)))))),
63+
// todo gp disabling cleanupLostAttempts to simplify output during development
64+
.environment(env -> env.transactionsConfig(
65+
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)))))),
6666
bucketName, scopeName);
6767
}
6868

6969
public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
70-
final String bucketName, final String scopeName, final ClusterEnvironment environment) {
70+
final String bucketName, final String scopeName, final ClusterEnvironment environment) {
7171
this(
7272
new OwnedSupplier<>(
7373
Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator).environment(environment))),
@@ -81,7 +81,7 @@ public SimpleCouchbaseClientFactory(final Cluster cluster, final String bucketNa
8181
}
8282

8383
private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName,
84-
final String scopeName) {
84+
final String scopeName) {
8585
this.cluster = cluster;
8686
this.bucket = cluster.get().bucket(bucketName);
8787
this.scope = scopeName == null ? bucket.defaultScope() : bucket.scope(scopeName);

src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class SimpleReactiveCouchbaseClientFactory implements ReactiveCouchbaseCl
3838
CouchbaseTransactionalOperator transactionalOperator;
3939

4040
public SimpleReactiveCouchbaseClientFactory(Cluster cluster, String bucketName, String scopeName,
41-
CouchbaseTransactionalOperator transactionalOperator) {
41+
CouchbaseTransactionalOperator transactionalOperator) {
4242
this.cluster = Mono.just(cluster);
4343
this.theCluster = cluster;
4444
this.bucketName = bucketName;
@@ -146,15 +146,13 @@ public void close() {
146146
}
147147

148148
@Override
149-
public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(TransactionOptions options) { // hopefully this
150-
// gets filled in
151-
// later
152-
return Mono.just(new ReactiveCouchbaseResourceHolder(null));
149+
public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(TransactionOptions options) {
150+
return Mono.just(new ReactiveCouchbaseResourceHolder(null));
153151
}
154152

155153
@Override
156154
public ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
157-
CoreTransactionAttemptContext atr) {
155+
CoreTransactionAttemptContext atr) {
158156
if (atr == null) {
159157
atr = AttemptContextReactiveAccessor
160158
.newCoreTranactionAttemptContext(AttemptContextReactiveAccessor.reactive(transactions));
@@ -218,7 +216,7 @@ static final class CoreTransactionAttemptContextBoundCouchbaseClientFactory
218216
// private final Transactions transactions;
219217

220218
CoreTransactionAttemptContextBoundCouchbaseClientFactory(ReactiveCouchbaseResourceHolder transactionResources,
221-
ReactiveCouchbaseClientFactory delegate, Transactions transactions) {
219+
ReactiveCouchbaseClientFactory delegate, Transactions transactions) {
222220
this.transactionResources = transactionResources;
223221
this.delegate = delegate;
224222
// this.transactions = transactions;
@@ -308,7 +306,7 @@ public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(Transaction
308306

309307
@Override
310308
public ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
311-
CoreTransactionAttemptContext atr) {
309+
CoreTransactionAttemptContext atr) {
312310
ReactiveCouchbaseResourceHolder holder = delegate.getTransactionResources(options, atr);
313311
return holder;
314312
}

src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public ClusterEnvironment couchbaseClusterEnvironment() {
168168
throw new CouchbaseException("non-shadowed Jackson not present");
169169
}
170170
builder.jsonSerializer(JacksonJsonSerializer.create(couchbaseObjectMapper()));
171+
// todo gp only suitable for tests
171172
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false));
172173
builder.transactionsConfig(transactionsConfig());
173174
configureEnvironment(builder);
@@ -185,15 +186,15 @@ protected void configureEnvironment(final ClusterEnvironment.Builder builder) {
185186

186187
@Bean(name = BeanNames.COUCHBASE_TEMPLATE)
187188
public CouchbaseTemplate couchbaseTemplate(CouchbaseClientFactory couchbaseClientFactory,
188-
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
189-
MappingCouchbaseConverter mappingCouchbaseConverter, TranslationService couchbaseTranslationService) {
189+
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
190+
MappingCouchbaseConverter mappingCouchbaseConverter, TranslationService couchbaseTranslationService) {
190191
return new CouchbaseTemplate(couchbaseClientFactory, reactiveCouchbaseClientFactory, mappingCouchbaseConverter,
191192
couchbaseTranslationService, getDefaultConsistency());
192193
}
193194

194195
public CouchbaseTemplate couchbaseTemplate(CouchbaseClientFactory couchbaseClientFactory,
195-
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
196-
MappingCouchbaseConverter mappingCouchbaseConverter) {
196+
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
197+
MappingCouchbaseConverter mappingCouchbaseConverter) {
197198
return couchbaseTemplate(couchbaseClientFactory, reactiveCouchbaseClientFactory, mappingCouchbaseConverter,
198199
new JacksonTranslationService());
199200
}
@@ -291,7 +292,7 @@ public String typeKey() {
291292
*/
292293
@Bean
293294
public MappingCouchbaseConverter mappingCouchbaseConverter(CouchbaseMappingContext couchbaseMappingContext,
294-
CouchbaseCustomConversions couchbaseCustomConversions) {
295+
CouchbaseCustomConversions couchbaseCustomConversions) {
295296
MappingCouchbaseConverter converter = new MappingCouchbaseConverter(couchbaseMappingContext, typeKey());
296297
converter.setCustomConversions(couchbaseCustomConversions);
297298
return converter;
@@ -346,12 +347,6 @@ public ObjectMapper couchbaseObjectMapper() {
346347

347348
/***** ALL THIS TX SHOULD BE MOVED OUT INTO THE IMPL OF AbstractCouchbaseConfiguration *****/
348349

349-
// todo gp how to DI this into the Cluster creation esp. as it creates a CoreTransactionConfig
350-
// @Bean
351-
// public TransactionsConfig transactionConfig() {
352-
// return TransactionsConfig.builder().build();
353-
// }
354-
355350
@Bean(BeanNames.REACTIVE_COUCHBASE_TRANSACTION_MANAGER)
356351
ReactiveCouchbaseTransactionManager reactiveTransactionManager(
357352
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory) {
@@ -377,11 +372,13 @@ CouchbaseTransactionManager transactionManager(CouchbaseClientFactory clientFact
377372
return new CouchbaseTransactionManager(clientFactory, options);
378373
}
379374

375+
// todo gpx these would be per-transactions options so it seems odd to have a global bean? Surely would want to configure everything at global level instead?
380376
@Bean
381377
public TransactionOptions transactionsOptions(){
382378
return TransactionOptions.transactionOptions();
383379
}
384380

381+
// todo gpx transactions config is now done in standard ClusterConfig - so I think we don't want a separate bean?
385382
public TransactionsConfig.Builder transactionsConfig(){
386383
return TransactionsConfig.builder().durabilityLevel(DurabilityLevel.NONE).timeout(Duration.ofMinutes(20));// for testing
387384
}

0 commit comments

Comments
 (0)