Skip to content

Programmatix datacouch 1145 transaction support #1446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
02a48f8
Porting to SDK-integrated version of transactions
programmatix Apr 26, 2022
a778c6d
Continuing work to get the ExtSDKIntegration port working
programmatix Apr 28, 2022
5012c4e
Added CouchbaseSimpleCallbackTransactionManager, the simplest
programmatix Apr 29, 2022
98b695e
Adding WIP get-and-replace @Transactional support
programmatix May 3, 2022
f3cf815
Transitioning to use CoreTransactionAttemptContext.
programmatix May 17, 2022
f02d2ba
Removing AttemptContextReactiveAccessor
programmatix May 19, 2022
b828694
Removing TransactionsReactive
programmatix May 19, 2022
28b515e
Removing some files not currently used
programmatix May 19, 2022
088efa6
Removing CouchbaseTransactionInterceptor
programmatix May 19, 2022
ffa63ae
Copying @Transactional tests out into separate class
programmatix May 24, 2022
474d61f
Tidyup
programmatix May 24, 2022
f41af12
Tidyup test names
programmatix May 24, 2022
c5952e1
Verify GenericSupport is on same thread before and after transactiona…
programmatix May 24, 2022
7d53985
Refactoring CouchbaseSimpleCallbackTransactionManager ThreadLocalStor…
programmatix May 24, 2022
0eb5048
Using latest java-client
programmatix May 24, 2022
1c067b8
ReactiveReplaceByIdSupport - Fixing use of CAS now have CoreTransacti…
programmatix May 24, 2022
056d1fe
ReactiveInsertByIdSupport - fixing use of reactive vs non-reactive, a…
programmatix May 24, 2022
eb4bb70
Merging upstream
programmatix May 24, 2022
7d18741
Remove incorrect thread check (.doOnNext could execute on a different…
programmatix May 24, 2022
0925a28
Completing merge from upstream
programmatix May 25, 2022
1b0cf1e
Removing unused classes
programmatix May 25, 2022
220b2c6
Give GenericSupport a better name
programmatix May 25, 2022
938b65d
Reject at runtime options that aren't supported in a transaction
programmatix May 26, 2022
56eaf77
Fixing some small todos, partly by removing unused coe
programmatix May 26, 2022
fc75099
Fix runtime option checks
programmatix May 26, 2022
4582b56
Simplifying CouchbaseSimpleCallbackTransactionManager ThreadLocalStorage
programmatix May 26, 2022
c06dcd9
Removing version from CouchbaseDocument
programmatix May 26, 2022
d3e3b39
Improving CouchbaseTransactionalIntegrationTests and adding more tests
programmatix May 26, 2022
5254943
Reject operations that aren't allowed in a transaction (upsertById etc.)
programmatix May 26, 2022
b7fbd81
Improve handling of CAS mismatch
programmatix May 26, 2022
103e993
Removing a now-redundant non-transactional check on upsertById
programmatix May 26, 2022
2458ac7
Support @Transactional options timeout and isolation level
programmatix May 26, 2022
4978cec
Merge branch 'datacouch_1145_transaction_support' of https://github.c…
mikereiche May 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/com/couchbase/client/java/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
* The SDK will only work against Couchbase Server 5.0 and later, because RBAC (role-based access control) is a first
* class concept since 3.0 and therefore required.
*/
// todo gp is this required?
// todo gpx as per discussion with miker - if required, ClusterInterface will be added to the SDK instead
public class Cluster implements ClusterInterface {

/**
Expand Down Expand Up @@ -256,9 +256,9 @@ public static Cluster connect(final String connectionString, final ClusterOption
final ClusterOptions.Built opts = options.build();
final Supplier<ClusterEnvironment> environmentSupplier = extractClusterEnvironment(connectionString, opts);
return new Cluster(
environmentSupplier,
opts.authenticator(),
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
environmentSupplier,
opts.authenticator(),
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public interface ClusterInterface {

//AnalyticsResult analyticsQuery(String statement);

// AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);
// AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);

SearchResult searchQuery(String indexName, SearchQuery query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@
*/
public class AttemptContextReactiveAccessor {

public static ReactiveTransactionAttemptContext getACR(TransactionAttemptContext attemptContext) {
// return attemptContext.ctx();
// todo gp is this access needed. Could hold the raw CoreTransactionAttemptContext instead.
return null;
}

public static ReactiveTransactions reactive(Transactions transactions) {
try {
Field field = Transactions.class.getDeclaredField("reactive");
Expand Down Expand Up @@ -212,26 +206,18 @@ public static TransactionResult run(Transactions transactions, Consumer<Transact
return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
}

// todo gp if needed let's expose in the SDK
// static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
// StringBuilder sb = new StringBuilder();
// sb.append("library version: ");
// sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
// sb.append(" config: ");
// sb.append("atrs=");
// sb.append(config.numAtrs());
// sb.append(", metadataCollection=");
// sb.append(config.metadataCollection());
// sb.append(", expiry=");
// sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
// sb.append("msecs durability=");
// sb.append(config.durabilityLevel());
// sb.append(" per-txn config=");
// sb.append(" durability=");
// sb.append(perConfig.durabilityLevel());
// sb.append(", supported=");
// sb.append(Supported.SUPPORTED);
// return sb.toString();
// }
CoreTransactionAttemptContext coreTransactionsReactive;
try {
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
field.setAccessible(true);
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
} catch (Throwable err) {
throw new RuntimeException(err);
}
return coreTransactionsReactive;
}

public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public <T> SpringTransactionGetResult<T> findById(String id, Class<T> domainType
CoreTransactionAttemptContext ctx = getContext();
CoreTransactionGetResult getResult = ctx.get( makeCollectionIdentifier(template.getCouchbaseClientFactory().getDefaultCollection().async()) , id).block();

// todo gp getResult.cas() is no longer exposed - required?
T t = template.support().decodeEntity(id, new String(getResult.contentAsBytes()), 0, domainType,
T t = template.support().decodeEntity(id, new String(getResult.contentAsBytes()), getResult.cas(), domainType,
null, null, null);
return new SpringTransactionGetResult<>(t, getResult);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public CoreTransactionGetResult getInner() {
@Override
public String toString() {
return "SpringTransactionGetResult{" +
"value=" + value +
", inner=" + inner +
'}';
"value=" + value +
", inner=" + inner +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public interface CouchbaseClientFactory extends Closeable {
PersistenceExceptionTranslator getExceptionTranslator();

CoreTransactionAttemptContext getCore(TransactionOptions options,
CoreTransactionAttemptContext atr);
CoreTransactionAttemptContext atr);

//CouchbaseClientFactory with(CouchbaseTransactionalOperator txOp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
import com.couchbase.client.java.transactions.config.TransactionOptions;
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
import org.springframework.data.couchbase.transaction.ReactiveCouchbaseResourceHolder;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -107,8 +106,7 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*

void close() throws IOException;

ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
CoreTransactionAttemptContext ctx);
ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options, CoreTransactionAttemptContext ctx);

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ public class SimpleCouchbaseClientFactory implements CouchbaseClientFactory {
//private JsonSerializer serializer = null;

public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
final String bucketName) {
final String bucketName) {
this(connectionString, authenticator, bucketName, null);
}

public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
final String bucketName, final String scopeName) {
final String bucketName, final String scopeName) {
this(new OwnedSupplier<>(Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator)
// todo gp disabling cleanupLostAttempts to simplify output during development
.environment(env -> env.transactionsConfig(
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)))))),
// todo gp disabling cleanupLostAttempts to simplify output during development
.environment(env -> env.transactionsConfig(
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)))))),
bucketName, scopeName);
}

public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
final String bucketName, final String scopeName, final ClusterEnvironment environment) {
final String bucketName, final String scopeName, final ClusterEnvironment environment) {
this(
new OwnedSupplier<>(
Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator).environment(environment))),
Expand All @@ -81,7 +81,7 @@ public SimpleCouchbaseClientFactory(final Cluster cluster, final String bucketNa
}

private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName,
final String scopeName) {
final String scopeName) {
this.cluster = cluster;
this.bucket = cluster.get().bucket(bucketName);
this.scope = scopeName == null ? bucket.defaultScope() : bucket.scope(scopeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SimpleReactiveCouchbaseClientFactory implements ReactiveCouchbaseCl
CouchbaseTransactionalOperator transactionalOperator;

public SimpleReactiveCouchbaseClientFactory(Cluster cluster, String bucketName, String scopeName,
CouchbaseTransactionalOperator transactionalOperator) {
CouchbaseTransactionalOperator transactionalOperator) {
this.cluster = Mono.just(cluster);
this.theCluster = cluster;
this.bucketName = bucketName;
Expand Down Expand Up @@ -146,15 +146,13 @@ public void close() {
}

@Override
public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(TransactionOptions options) { // hopefully this
// gets filled in
// later
return Mono.just(new ReactiveCouchbaseResourceHolder(null));
public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(TransactionOptions options) {
return Mono.just(new ReactiveCouchbaseResourceHolder(null));
}

@Override
public ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
CoreTransactionAttemptContext atr) {
CoreTransactionAttemptContext atr) {
if (atr == null) {
atr = AttemptContextReactiveAccessor
.newCoreTranactionAttemptContext(AttemptContextReactiveAccessor.reactive(transactions));
Expand Down Expand Up @@ -218,7 +216,7 @@ static final class CoreTransactionAttemptContextBoundCouchbaseClientFactory
// private final Transactions transactions;

CoreTransactionAttemptContextBoundCouchbaseClientFactory(ReactiveCouchbaseResourceHolder transactionResources,
ReactiveCouchbaseClientFactory delegate, Transactions transactions) {
ReactiveCouchbaseClientFactory delegate, Transactions transactions) {
this.transactionResources = transactionResources;
this.delegate = delegate;
// this.transactions = transactions;
Expand Down Expand Up @@ -308,7 +306,7 @@ public Mono<ReactiveCouchbaseResourceHolder> getTransactionResources(Transaction

@Override
public ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
CoreTransactionAttemptContext atr) {
CoreTransactionAttemptContext atr) {
ReactiveCouchbaseResourceHolder holder = delegate.getTransactionResources(options, atr);
return holder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public ClusterEnvironment couchbaseClusterEnvironment() {
throw new CouchbaseException("non-shadowed Jackson not present");
}
builder.jsonSerializer(JacksonJsonSerializer.create(couchbaseObjectMapper()));
// todo gp only suitable for tests
TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false));
builder.transactionsConfig(transactionsConfig());
configureEnvironment(builder);
Expand All @@ -185,15 +186,15 @@ protected void configureEnvironment(final ClusterEnvironment.Builder builder) {

@Bean(name = BeanNames.COUCHBASE_TEMPLATE)
public CouchbaseTemplate couchbaseTemplate(CouchbaseClientFactory couchbaseClientFactory,
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
MappingCouchbaseConverter mappingCouchbaseConverter, TranslationService couchbaseTranslationService) {
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
MappingCouchbaseConverter mappingCouchbaseConverter, TranslationService couchbaseTranslationService) {
return new CouchbaseTemplate(couchbaseClientFactory, reactiveCouchbaseClientFactory, mappingCouchbaseConverter,
couchbaseTranslationService, getDefaultConsistency());
}

public CouchbaseTemplate couchbaseTemplate(CouchbaseClientFactory couchbaseClientFactory,
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
MappingCouchbaseConverter mappingCouchbaseConverter) {
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory,
MappingCouchbaseConverter mappingCouchbaseConverter) {
return couchbaseTemplate(couchbaseClientFactory, reactiveCouchbaseClientFactory, mappingCouchbaseConverter,
new JacksonTranslationService());
}
Expand Down Expand Up @@ -291,7 +292,7 @@ public String typeKey() {
*/
@Bean
public MappingCouchbaseConverter mappingCouchbaseConverter(CouchbaseMappingContext couchbaseMappingContext,
CouchbaseCustomConversions couchbaseCustomConversions) {
CouchbaseCustomConversions couchbaseCustomConversions) {
MappingCouchbaseConverter converter = new MappingCouchbaseConverter(couchbaseMappingContext, typeKey());
converter.setCustomConversions(couchbaseCustomConversions);
return converter;
Expand Down Expand Up @@ -346,12 +347,6 @@ public ObjectMapper couchbaseObjectMapper() {

/***** ALL THIS TX SHOULD BE MOVED OUT INTO THE IMPL OF AbstractCouchbaseConfiguration *****/

// todo gp how to DI this into the Cluster creation esp. as it creates a CoreTransactionConfig
// @Bean
// public TransactionsConfig transactionConfig() {
// return TransactionsConfig.builder().build();
// }

@Bean(BeanNames.REACTIVE_COUCHBASE_TRANSACTION_MANAGER)
ReactiveCouchbaseTransactionManager reactiveTransactionManager(
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory) {
Expand All @@ -377,11 +372,13 @@ CouchbaseTransactionManager transactionManager(CouchbaseClientFactory clientFact
return new CouchbaseTransactionManager(clientFactory, options);
}

// todo gpx these would be per-transactions options so it seems odd to have a global bean? Surely would want to configure everything at global level instead?
@Bean
public TransactionOptions transactionsOptions(){
return TransactionOptions.transactionOptions();
}

// todo gpx transactions config is now done in standard ClusterConfig - so I think we don't want a separate bean?
public TransactionsConfig.Builder transactionsConfig(){
return TransactionsConfig.builder().durabilityLevel(DurabilityLevel.NONE).timeout(Duration.ofMinutes(20));// for testing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public AbstractTemplateSupport(ReactiveCouchbaseTemplate template, CouchbaseConv
abstract ReactiveCouchbaseTemplate getReactiveTemplate();

public <T> T decodeEntityBase(String id, String source, long cas, Class<T> entityClass, String scope, String collection,
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
final CouchbaseDocument converted = new CouchbaseDocument(id);
converted.setId(id);

Expand Down Expand Up @@ -127,7 +127,7 @@ CouchbasePersistentEntity couldBePersistentEntity(Class<?> entityClass) {


public <T> T applyResultBase(T entity, CouchbaseDocument converted, Object id, long cas,
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
ConvertingPropertyAccessor<Object> accessor = getPropertyAccessor(entity);

final CouchbasePersistentEntity<?> persistentEntity = converter.getMappingContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ public interface CouchbaseOperations extends FluentCouchbaseOperations {
QueryScanConsistency getConsistency();
<T> T save(T entity);

<T> Long count(Query query, Class<T> domainType);
<T> Long count(Query query, Class<T> domainType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ public class CouchbaseTemplate implements CouchbaseOperations, ApplicationContex
private CouchbaseTransactionalOperator couchbaseTransactionalOperator;

public CouchbaseTemplate(final CouchbaseClientFactory clientFactory,
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, final CouchbaseConverter converter) {
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, final CouchbaseConverter converter) {
this(clientFactory, reactiveCouchbaseClientFactory, converter, new JacksonTranslationService());
}

public CouchbaseTemplate(final CouchbaseClientFactory clientFactory,
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, CouchbaseConverter converter,
final TranslationService translationService) {
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, CouchbaseConverter converter,
final TranslationService translationService) {
this(clientFactory, reactiveCouchbaseClientFactory, converter, translationService, null);
}

public CouchbaseTemplate(final CouchbaseClientFactory clientFactory,
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, final CouchbaseConverter converter,
final TranslationService translationService, QueryScanConsistency scanConsistency) {
final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, final CouchbaseConverter converter,
final TranslationService translationService, QueryScanConsistency scanConsistency) {
this.clientFactory = clientFactory;
this.converter = converter;
this.templateSupport = new CouchbaseTemplateSupport(this, converter, translationService);
Expand All @@ -91,8 +91,8 @@ public CouchbaseTemplate(final CouchbaseClientFactory clientFactory,
public <T> T save(T entity) {
if (hasNonZeroVersionProperty(entity, templateSupport.converter)) {
return replaceById((Class<T>) entity.getClass()).one(entity);
//} else if (getTransactionalOperator() != null) {
// return insertById((Class<T>) entity.getClass()).one(entity);
//} else if (getTransactionalOperator() != null) {
// return insertById((Class<T>) entity.getClass()).one(entity);
} else {
return upsertById((Class<T>) entity.getClass()).one(entity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CouchbaseTemplateSupport extends AbstractTemplateSupport implements Applic
private EntityCallbacks entityCallbacks;

public CouchbaseTemplateSupport(final CouchbaseTemplate template, final CouchbaseConverter converter,
final TranslationService translationService) {
final TranslationService translationService) {
super(template.reactive(), converter, translationService);
this.template = template;
}
Expand All @@ -69,25 +69,25 @@ ReactiveCouchbaseTemplate getReactiveTemplate() {

@Override
public <T> T decodeEntity(String id, String source, long cas, Class<T> entityClass, String scope, String collection,
TransactionResultHolder txHolder) {
TransactionResultHolder txHolder) {
return decodeEntity(id, source, cas, entityClass, scope, collection, txHolder);
}

@Override
public <T> T decodeEntity(String id, String source, long cas, Class<T> entityClass, String scope, String collection,
TransactionResultHolder txHolder, ReactiveCouchbaseResourceHolder holder) {
TransactionResultHolder txHolder, ReactiveCouchbaseResourceHolder holder) {
return decodeEntityBase(id, source, cas, entityClass, scope, collection, txHolder, holder);
}

@Override
public <T> T applyResult(T entity, CouchbaseDocument converted, Object id, long cas,
TransactionResultHolder txResultHolder) {
TransactionResultHolder txResultHolder) {
return applyResult(entity, converted, id, cas,txResultHolder, null);
}

@Override
public <T> T applyResult(T entity, CouchbaseDocument converted, Object id, long cas,
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
TransactionResultHolder txResultHolder, ReactiveCouchbaseResourceHolder holder) {
return applyResultBase(entity, converted, id, cas, txResultHolder, holder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static class ExecutableFindByIdSupport<T> implements ExecutableFindById<T> {
private final ReactiveFindByIdSupport<T> reactiveSupport;

ExecutableFindByIdSupport(CouchbaseTemplate template, Class<T> domainType, String scope, String collection,
GetOptions options, List<String> fields, Duration expiry, CouchbaseTransactionalOperator txCtx) {
GetOptions options, List<String> fields, Duration expiry, CouchbaseTransactionalOperator txCtx) {
this.template = template;
this.domainType = domainType;
this.scope = scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ interface FindByQueryWithOptions<T> extends TerminatingFindByQuery<T>, WithQuery

/**
* To be removed at the next major release. use WithConsistency instead
*
*
* @param <T> the entity type to use for the results.
*/
@Deprecated
Expand Down
Loading