diff --git a/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java index 7397759e5..7c631eeca 100644 --- a/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java +++ b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java @@ -66,22 +66,6 @@ public static ReactiveTransactionAttemptContext reactive(TransactionAttemptConte return new ReactiveTransactionAttemptContext(getCore(atr), serializer); } - public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) { - JsonSerializer serializer; - try { - Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("serializer"); - field.setAccessible(true); - serializer = (JsonSerializer) field.get(atr); - } catch (Throwable err) { - throw new RuntimeException(err); - } - return new TransactionAttemptContext(getCore(atr), serializer); - } - - public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) { - return attemptContextReactive.logger(); - } - public static CoreTransactionLogger getLogger(TransactionAttemptContext attemptContextReactive) { return attemptContextReactive.logger(); } @@ -100,6 +84,7 @@ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(Reac throw new RuntimeException(err); } + // todo gpx options need to be loaded from Cluster and TransactionOptions (from TransactionsWrapper) CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(Duration.ofMinutes(10)), Optional.empty(), Optional.empty()); @@ -115,16 +100,6 @@ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(Reac return coreTransactionAttemptContext; } - private static Duration now() { - return Duration.of(System.nanoTime(), ChronoUnit.NANOS); - } - - public static ReactiveTransactionAttemptContext from(CoreTransactionAttemptContext coreTransactionAttemptContext, - JsonSerializer serializer) { - TransactionAttemptContext tac = new TransactionAttemptContext(coreTransactionAttemptContext, serializer); - return reactive(tac); - } - public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptContext atr) { CoreTransactionAttemptContext coreTransactionsReactive; try { @@ -147,61 +122,11 @@ public static CoreTransactionAttemptContext getCore(TransactionAttemptContext at } } - public static Mono implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) { - CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr); - try { - // getDeclaredMethod() does not find it (because of primitive arg?) - // CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class); - Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods(); - Method method = null; - for (Method m : methods) { - if (m.getName().equals("implicitCommit")) { - method = m; - break; - } - } - if (method == null) { - throw new RuntimeException("did not find implicitCommit method"); - } - method.setAccessible(true); - return (Mono) method.invoke(coreTransactionsReactive, b); - } catch (Throwable err) { - throw new RuntimeException(err); - } - - } - - public static AttemptState getState(ReactiveTransactionAttemptContext atr) { - CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr); - try { - Field field = CoreTransactionAttemptContext.class.getDeclaredField("state"); - field.setAccessible(true); - return (AttemptState) field.get(coreTransactionsReactive); - } catch (Throwable err) { - throw new RuntimeException(err); - } - } - public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext( CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) { return new ReactiveTransactionAttemptContext(core, jsonSerializer); } - public static CoreTransactionsReactive getCoreTransactionsReactive(ReactiveTransactions transactions) { - try { - Field field = ReactiveTransactions.class.getDeclaredField("internal"); - field.setAccessible(true); - return (CoreTransactionsReactive) field.get(transactions); - } catch (Throwable err) { - throw new RuntimeException(err); - } - } - - public static TransactionAttemptContext newTransactionAttemptContext(CoreTransactionAttemptContext ctx, - JsonSerializer jsonSerializer) { - return new TransactionAttemptContext(ctx, jsonSerializer); - } - public static TransactionResult run(Transactions transactions, Consumer transactionLogic, CoreTransactionOptions coreTransactionOptions) { return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions); } diff --git a/src/main/java/com/couchbase/transactions/TransactionsReactive.java b/src/main/java/com/couchbase/transactions/TransactionsReactive.java deleted file mode 100644 index 352135ead..000000000 --- a/src/main/java/com/couchbase/transactions/TransactionsReactive.java +++ /dev/null @@ -1,753 +0,0 @@ -///* -// * Copyright 2021 Couchbase, Inc. -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package com.couchbase.transactions; -// -//import com.couchbase.client.core.annotation.Stability; -//import com.couchbase.client.core.cnc.EventBus; -//import com.couchbase.client.core.retry.reactor.DefaultRetry; -//import com.couchbase.client.core.retry.reactor.Jitter; -//import com.couchbase.client.core.retry.reactor.RetryContext; -//import com.couchbase.client.java.Cluster; -//import com.couchbase.client.java.ReactiveCollection; -//import com.couchbase.client.java.ReactiveScope; -//import com.couchbase.client.java.json.JsonObject; -//import com.couchbase.client.java.query.ReactiveQueryResult; -//import com.couchbase.transactions.cleanup.ClusterData; -//import com.couchbase.transactions.cleanup.TransactionsCleanup; -//import com.couchbase.transactions.components.ATR; -//import com.couchbase.transactions.components.ActiveTransactionRecord; -//import com.couchbase.transactions.config.MergedTransactionConfig; -//import com.couchbase.transactions.config.PerTransactionConfig; -//import com.couchbase.transactions.config.PerTransactionConfigBuilder; -//import com.couchbase.transactions.config.SingleQueryTransactionConfig; -//import com.couchbase.transactions.config.SingleQueryTransactionConfigBuilder; -//import com.couchbase.transactions.config.TransactionConfig; -//import com.couchbase.transactions.deferred.TransactionSerializedContext; -//import com.couchbase.transactions.error.TransactionCommitAmbiguous; -//import com.couchbase.transactions.error.TransactionExpired; -//import com.couchbase.transactions.error.TransactionFailedException; -//import com.couchbase.transactions.error.internal.ErrorClasses; -//import com.couchbase.transactions.error.external.TransactionOperationFailed; -//import com.couchbase.transactions.forwards.Supported; -//import com.couchbase.transactions.log.EventBusPersistedLogger; -//import com.couchbase.transactions.log.PersistedLogWriter; -//import com.couchbase.transactions.log.TransactionLogEvent; -//import com.couchbase.transactions.support.AttemptContextFactory; -//import com.couchbase.transactions.support.AttemptState; -//import com.couchbase.transactions.support.OptionsWrapperUtil; -//import com.couchbase.transactions.util.DebugUtil; -//import reactor.core.publisher.Mono; -//import reactor.core.scheduler.Schedulers; -// -//import java.time.Duration; -//import java.time.temporal.ChronoUnit; -//import java.util.Objects; -//import java.util.Optional; -//import java.util.UUID; -//import java.util.concurrent.TimeUnit; -//import java.util.concurrent.atomic.AtomicReference; -//import java.util.function.Consumer; -//import java.util.function.Function; -//import java.util.function.Predicate; -// -//import static com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder.createError; -//import static com.couchbase.transactions.log.PersistedLogWriter.MAX_LOG_ENTRIES_DEFAULT; -//import static com.couchbase.transactions.support.SpanWrapperUtil.DB_COUCHBASE_TRANSACTIONS; -// -///** -// * An asynchronous version of {@link Transactions}, allowing transactions to be created and run in an asynchronous -// * manner. -// *

-// * The main method to run transactions is {@link TransactionsReactive#run}. -// */ -//public class TransactionsReactive { -// static final int MAX_ATTEMPTS = 1000; -// private final TransactionsCleanup cleanup; -// private final TransactionConfig config; -// private AttemptContextFactory attemptContextFactory; -// private EventBusPersistedLogger persistedLogger; -// -// /** -// * This is package-private. Applications should create a {@link Transactions} object instead, and then call {@link -// * Transactions#reactive}. -// */ -// static TransactionsReactive create(Cluster cluster, TransactionConfig config) { -// return new TransactionsReactive(cluster, config); -// } -// -// private TransactionsReactive(Cluster cluster, TransactionConfig config) { -// Objects.requireNonNull(cluster); -// Objects.requireNonNull(config); -// -// ClusterData clusterData = new ClusterData(cluster); -// this.config = config; -// this.attemptContextFactory = config.attemptContextFactory(); -// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.empty()); -// cleanup = new TransactionsCleanup(merged, clusterData); -// -// config.persistentLoggingCollection().ifPresent(collection -> { -// PersistedLogWriter persistedLogWriter = new PersistedLogWriter(collection, MAX_LOG_ENTRIES_DEFAULT); -// persistedLogger = new EventBusPersistedLogger(cluster.environment().eventBus(), persistedLogWriter, merged); -// }); -// } -// -// -// /** -// * The main transactions 'engine', responsible for attempting the transaction logic as many times as required, -// * until the transaction commits, is explicitly rolled back, or expires. -// */ -// // TODO: changed from private to public. package-protected plus an accessor would be ok to -// public Mono executeTransaction(MergedTransactionConfig config, -// TransactionContext overall, -// Mono transactionLogic) { -// AtomicReference startTime = new AtomicReference<>(); -// -// return Mono.just(overall) -// -// .subscribeOn(reactor.core.scheduler.Schedulers.elastic()) -// -// .doOnSubscribe(v -> { -// if (startTime.get() == null) startTime.set(System.nanoTime()); -// }) -// -// // Where the magic happens: execute the app's transaction logic -// // A ReactiveTransactionAttemptContext gets created in here. Rollback requires one of these (so it knows what -// // to rollback), so only errors thrown inside this block can trigger rollback. -// // So, expiry checks only get done inside this block. -// .then(transactionLogic) -// -// .flatMap(this::executeImplicitCommit) -// -// // Track an attempt if non-error, and request that the attempt be cleaned up. Similar logic is also -// // done in executeHandleErrorsPreRetry. -// .doOnNext(ctx -> executeAddAttemptAndCleanupRequest(config, overall, ctx)) -// -// // Track an attempt if error, and perform rollback if needed. -// // All errors reaching here must be a `TransactionOperationFailed`. -// .onErrorResume(err -> executeHandleErrorsPreRetry(config, overall, err)) -// -// // This is the main place to retry txns. Feed all errors up to this centralised point. -// // All errors reaching here must be a `TransactionOperationFailed`. -// .retryWhen(executeCreateRetryWhen(overall)) -// -// // If we're here, then we've hit an error that we don't want to retry. -// // Either raise some derivative of TransactionFailedException to the app, or return an ReactiveTransactionAttemptContext -// // to return success (some errors result in success, e.g. TRANSACTION_FAILED_POST_COMMIT) -// // All errors reaching here must be an `ErrorWrapper`. -// .onErrorResume(err -> executeHandleErrorsPostRetry(overall, err)) -// -// .doOnError(err -> { -// if (config.logOnFailure() && !config.logDirectly()) { -// EventBus eventBus = cleanup.clusterData().cluster().environment().eventBus(); -// overall.LOGGER.logs().forEach(log -> { -// eventBus.publish(new TransactionLogEvent(config.logOnFailureLevel(), -// TransactionLogEvent.DEFAULT_CATEGORY, log.toString())); -// }); -// } -// }) -// -// // If we get here, success -// .doOnSuccess(v -> -// overall.LOGGER.info("finished txn in %dus", -// TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime.get())) -// ) -// -// // Safe to do single() as there will only ever be 1 result -// .single() -// .map(v -> createResultFromContext(overall)); -// } -// -// private reactor.util.retry.Retry executeCreateRetryWhen(TransactionContext overall) { -// Predicate> predicate = context -> { -// Throwable exception = context.exception(); -// -// if (!(exception instanceof TransactionOperationFailed)) { -// // A bug. Only TransactionOperationFailed is allowed to reach here. -// throw new IllegalStateException("Non-TransactionOperationFailed '" + DebugUtil.dbg(exception) + "' received during retry, this is a bug", exception); -// } -// -// TransactionOperationFailed e = (TransactionOperationFailed) exception; -// -// overall.LOGGER.info("TransactionOperationFailed retryTransaction=%s", e.retryTransaction()); -// -// return e.retryTransaction(); -// }; -// -// return DefaultRetry.create(predicate) -// -// .exponentialBackoff(Duration.of(1, ChronoUnit.MILLIS), -// Duration.of(2, ChronoUnit.MILLIS)) -// -// .doOnRetry(v -> overall.LOGGER.info("<>", "retrying transaction after backoff %dmillis", v.backoff().toMillis())) -// -// // Add some jitter so two txns don't livelock each other -// .jitter(Jitter.random()) -// -// // Really, this is a safety-guard. The txn will be aborted when it expires. -// .retryMax(MAX_ATTEMPTS) -// -// .toReactorRetry(); -// } -// -// private Mono executeHandleErrorsPreRetry(MergedTransactionConfig config, -// TransactionContext overall, Throwable err) { -// if (!(err instanceof TransactionOperationFailed)) { -// // A bug. Only TransactionOperationFailed is allowed to reach here. -// overall.LOGGER.warn("<>", "received non-TransactionOperationFailed error %s, unable to rollback as don't have " + -// "context", DebugUtil.dbg(err)); -// return Mono.error(new IllegalStateException("received non-TransactionOperationFailed error " + err.getClass().getName() + " in pre-retry", err)); -// } -// -// Mono autoRollback = Mono.empty(); -// Mono cleanupReq = Mono.empty(); -// -// TransactionOperationFailed e = (TransactionOperationFailed) err; -// ReactiveTransactionAttemptContext ctx = e.context(); -// -// overall.LOGGER.info("<>", "finishing attempt off after error '%s'", e); -// -// if (e.autoRollbackAttempt()) { -// // In queryMode we always ROLLBACK, as there is possibly delta table state to cleanup, and there may be an -// // ATR - we don't know -// if (ctx.state() == AttemptState.NOT_STARTED && !ctx.queryMode()) { -// // This is a better way of doing [RETRY-ERR-NOATR] and likely means that the older logic for -// // handling that won't trigger now -// ctx.LOGGER.info(ctx.attemptId(), "told to auto-rollback but in NOT_STARTED state, so nothing to do - skipping rollback"); -// } -// else { -// ctx.LOGGER.info(ctx.attemptId(), "auto-rolling-back on error"); -// -// autoRollback = ctx.rollbackInternal(false); -// } -// } else { -// ctx.LOGGER.info(ctx.attemptId(), "has been told to skip auto-rollback"); -// } -// -// if (!config.runRegularAttemptsCleanupThread()) { -// // Don't add a request to a queue that no-one will be processing -// ctx.LOGGER.trace(ctx.attemptId(), "skipping addition of cleanup request on failure as regular cleanup disabled"); -// } -// else { -// cleanupReq = Mono.fromRunnable(() -> addCleanupRequestForContext(ctx)); -// } -// -// Mono addAttempt = Mono.fromRunnable(() -> { -// TransactionAttempt ta = TransactionAttempt.createFromContext(ctx, Optional.of(err)); -// overall.addAttempt(ta); -// ctx.LOGGER.info(ctx.attemptId(), "added attempt %s after error", ta); -// }); -// -// final Mono cleanupReqForLambda = cleanupReq; -// -// return autoRollback -// // See [Primary Operations] section in design document -// .onErrorResume(er -> { -// overall.LOGGER.info("<>", "rollback failed with %s, raising original error but with retryTransaction turned off", -// DebugUtil.dbg(er)); -// -// // Still want to add attempt and cleanup request -// return cleanupReqForLambda -// .then(addAttempt) -// .then(Mono.error(createError(e.context(), e.causingErrorClass()) -// .raiseException(e.toRaise()) -// .cause(e.getCause()) -// .build())); -// }) -// .then(cleanupReqForLambda) -// // Only want to add the attempt after doing the rollback, so the attempt has the correct state (hopefully -// // ROLLED_BACK) -// .then(addAttempt) -// .then(Mono.defer(() -> { -// if (e.retryTransaction() && overall.hasExpiredClientSide()) { -// overall.LOGGER.info("<>", "original error planned to retry transaction, but it has subsequently expired"); -// return Mono.error(createError(ctx, ErrorClasses.FAIL_EXPIRY) -// .doNotRollbackAttempt() -// .raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED) -// .build()); -// } -// else { -// // Raise the error up the stack so the logic later can decide whether to retry the transaction -// overall.LOGGER.info("<>", "reraising original exception %s", DebugUtil.dbg(err)); -// return Mono.error(err); -// } -// })) -// .doFinally(v -> ctx.span().failWith(e)) -// .thenReturn(ctx); -// } -// -// private Mono executeHandleErrorsPostRetry(TransactionContext overall, Throwable err) { -// if (!(err instanceof TransactionOperationFailed)) { -// // A bug. Only TransactionOperationFailed is allowed to reach here. -// return Mono.error(new IllegalStateException("Non-TransactionOperationFailed '" + DebugUtil.dbg(err) + "' received, this is a bug")); -// } -// -// TransactionResult result = createResultFromContext(overall); -// TransactionOperationFailed e = (TransactionOperationFailed) err; -// -// if (e.toRaise() == TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT) { -// e.context().LOGGER.info(e.context().attemptId(), "converted TRANSACTION_FAILED_POST_COMMIT to success, unstagingComplete() will be false"); -// -// return Mono.just(e.context()); -// } -// else { -// TransactionFailedException ret; -// -// switch (e.toRaise()) { -// case TRANSACTION_EXPIRED: { -// String msg = "Transaction has expired configured timeout of " + overall.expirationTime().toMillis() + "msecs. The transaction is not committed."; -// ret = new TransactionExpired(e.getCause(), result, msg); -// break; -// } -// case TRANSACTION_COMMIT_AMBIGUOUS: { -// String msg = "It is ambiguous whether the transaction committed"; -// ret = new TransactionCommitAmbiguous(e.getCause(), result, msg); -// break; -// } -// default: -// ret = new TransactionFailedException(e.getCause(), result); -// break; -// } -// -// e.context().LOGGER.info(e.context().attemptId(), "converted TransactionOperationFailed %s to final error %s", -// e.toRaise(), ret); -// -// return Mono.error(ret); -// } -// } -// -// private void executeAddAttemptAndCleanupRequest(MergedTransactionConfig config, TransactionContext overall, -// ReactiveTransactionAttemptContext ctx) { -// TransactionAttempt ta = TransactionAttempt.createFromContext(ctx, Optional.empty()); -// overall.addAttempt(ta); -// ctx.LOGGER.info(ctx.attemptId(), "added attempt %s after success", ta); -// -// if (config.runRegularAttemptsCleanupThread()) { -// addCleanupRequestForContext(ctx); -// } else { -// ctx.LOGGER.trace(ctx.attemptId(), "skipping addition of cleanup request on success"); -// } -// -// ctx.span().finish(); -// } -// -// private Mono executeImplicitCommit(ReactiveTransactionAttemptContext ctx) { -// return Mono.defer(() -> { -// // If app has not explicitly performed a commit, assume they want to do so anyway -// if (!ctx.isDone()) { -// if (ctx.serialized().isPresent()) { -// return Mono.just(ctx); -// } else { -// ctx.LOGGER.trace(ctx.attemptId(), "doing implicit commit"); -// -// return ctx.commit() -// .then(Mono.just(ctx)) -// .onErrorResume(err -> Mono.error(TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx))); -// } -// } else { -// return Mono.just(ctx); -// } -// }); -// } -// -// // TODO: changed from package-protected to public (could have just used an accessor class in same package) -// public ReactiveTransactionAttemptContext createAttemptContext(TransactionContext overall, -// MergedTransactionConfig config, -// String attemptId) { -// // null only happens in testing with Mockito, harmless -// if (overall != null) { -// return attemptContextFactory.create(overall, config, attemptId, this, Optional.of(overall.span())); -// } else { -// return null; -// } -// } -// -// /** -// * Runs the supplied transactional logic until success or failure. -// *

-// * This is the asynchronous version of {@link Transactions#run}, so to cover the differences: -// *

    -// *
  • The transaction logic is supplied with a {@link ReactiveTransactionAttemptContext}, which contains asynchronous -// * methods to allow it to read, mutate, insert and delete documents, as well as commit or rollback the -// * transactions.
  • -// *
  • The transaction logic should run these methods as a Reactor chain.
  • -// *
  • The transaction logic should return a Mono{@literal <}Void{@literal >}. Any -// * Flux or Mono can be converted to a Mono{@literal <}Void{@literal >} by -// * calling .then() on it.
  • -// *
  • This method returns a Mono{@literal <}TransactionResult{@literal >}, which should be handled -// * as a normal Reactor Mono.
  • -// *
-// * -// * @param transactionLogic the application's transaction logic -// * @param perConfig the configuration to use for this transaction -// * @return there is no need to check the returned {@link TransactionResult}, as success is implied by the lack of a -// * thrown exception. It contains information useful only for debugging and logging. -// * @throws TransactionFailedException or a derived exception if the transaction fails to commit for any reason, possibly -// * after multiple retries. The exception contains further details of the error. Not -// */ -// public Mono run(Function> transactionLogic, -// PerTransactionConfig perConfig) { -// return Mono.defer(() -> { -// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig)); -// -// TransactionContext overall = -// new TransactionContext(cleanup.clusterData().cluster().environment().requestTracer(), -// cleanup.clusterData().cluster().environment().eventBus(), -// UUID.randomUUID().toString(), -// now(), -// Duration.ZERO, -// merged); -// AtomicReference startTime = new AtomicReference<>(0L); -// -// Mono ob = Mono.fromCallable(() -> { -// String txnId = UUID.randomUUID().toString(); -// overall.LOGGER.info(configDebug(config, perConfig)); -// return createAttemptContext(overall, merged, txnId); -// }).flatMap(ctx -> { -// ctx.LOGGER.info("starting attempt %d/%s/%s", -// overall.numAttempts(), ctx.transactionId(), ctx.attemptId()); -// Mono result = transactionLogic.apply(ctx); -// return result -// .onErrorResume(err -> { -// ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in async, rethrowing", err); -// logElidedStacktrace(ctx, err); -// -// return Mono.error(TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx)); -// }) -// .thenReturn(ctx); -// }).doOnSubscribe(v -> startTime.set(System.nanoTime())) -// .doOnNext(v -> v.LOGGER.trace(v.attemptId(), "finished attempt %d in %sms", -// overall.numAttempts(), (System.nanoTime() - startTime.get()) / 1_000_000)); -// -// return executeTransaction(merged, overall, ob) -// .doOnNext(v -> overall.span().finish()) -// .doOnError(err -> overall.span().failWith(err)); -// }); -// } -// -// // Printing the stacktrace is expensive in terms of log noise, but has been a life saver on many debugging -// // encounters. Strike a balance by eliding the more useless elements. -// // TODO: changed from private to public -// public void logElidedStacktrace(ReactiveTransactionAttemptContext ctx, Throwable err) { -// DebugUtil.fetchElidedStacktrace(err, (s) -> ctx.LOGGER.info(ctx.attemptId(), " " + s.toString())); -// } -// -// // TODO: changed from private to public -// public static String configDebug(TransactionConfig config, PerTransactionConfig perConfig) { -// StringBuilder sb = new StringBuilder(); -// sb.append("library version: "); -// sb.append(TransactionsReactive.class.getPackage().getImplementationVersion()); -// sb.append(" config: "); -// sb.append("atrs="); -// sb.append(config.numAtrs()); -// sb.append(", metadataCollection="); -// sb.append(config.metadataCollection()); -// sb.append(", expiry="); -// sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis()); -// sb.append("msecs durability="); -// sb.append(config.durabilityLevel()); -// sb.append(" per-txn config="); -// sb.append(" durability="); -// sb.append(perConfig.durabilityLevel()); -// sb.append(", supported="); -// sb.append(Supported.SUPPORTED); -// return sb.toString(); -// } -// -// /** -// * Convenience overload that runs {@link TransactionsReactive#run} with a default PerTransactionConfig. -// */ -// public Mono run(Function> transactionLogic) { -// return run(transactionLogic, PerTransactionConfigBuilder.create().build()); -// } -// -// @Stability.Volatile -// public Mono commit(TransactionSerializedContext serialized, PerTransactionConfig perConfig) { -// return deferred(serialized, -// perConfig, -// // Nothing to actually do, just want the implicit commit -// (ctx) -> Mono.empty()); -// } -// -// @Stability.Volatile -// public Mono rollback(TransactionSerializedContext serialized, PerTransactionConfig perConfig) { -// return deferred(serialized, -// perConfig, -// (ctx) -> ctx.rollback()); -// } -// -// @Stability.Volatile -// private Mono deferred(TransactionSerializedContext serialized, -// PerTransactionConfig perConfig, -// Function> initial) { -// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig)); -// JsonObject hydrated = JsonObject.fromJson(serialized.encodeAsString()); -// -// String atrBucket = hydrated.getString("atrBucket"); -// String atrScope = hydrated.getString("atrScope"); -// String atrCollectionName = hydrated.getString("atrCollection"); -// String atrId = hydrated.getString("atrId"); -// ReactiveCollection atrCollection = cleanup.clusterData() -// .getBucketFromName(atrBucket) -// .scope(atrScope) -// .collection(atrCollectionName); -// -// return ActiveTransactionRecord.getAtr(atrCollection, -// atrId, -// OptionsWrapperUtil.kvTimeoutNonMutating(merged, atrCollection.core()), -// null) -// -// .flatMap(atrOpt -> { -// if (!atrOpt.isPresent()) { -// return Mono.error(new IllegalStateException(String.format("ATR %s/%s could not be found", -// atrBucket, atrId))); -// } -// else { -// ATR atr = atrOpt.get(); -// -// // Note startTimeServerMillis is written with ${Mutation.CAS} while currentTimeServer -// // could have come from $vbucket.HLC and is hence one-second granularity. So, this is a -// // somewhat imperfect comparison. -// Duration currentTimeServer = Duration.ofNanos(atr.cas()); -// Duration startTimeServer = Duration.ofMillis(hydrated.getLong("startTimeServerMillis")); -// -// // This includes the time elapsed during the first part of the transaction, plus any time -// // elapsed during the period the transaction was expired. Total time since the transaction -// // began, basically. -// Duration timeElapsed = currentTimeServer.minus(startTimeServer); -// -// TransactionContext overall = -// new TransactionContext(cleanup.clusterData().cluster().environment().requestTracer(), -// cleanup.clusterData().cluster().environment().eventBus(), -// UUID.randomUUID().toString(), -// Duration.ofNanos(System.nanoTime()), -// timeElapsed, -// merged); -// AtomicReference startTime = new AtomicReference<>(0L); -// -// overall.LOGGER.info("elapsed time = %dmsecs (ATR start time %dmsecs, current ATR time %dmsecs)", -// timeElapsed.toMillis(), startTimeServer.toMillis(), currentTimeServer.toMillis()); -// -// Mono ob = Mono.defer(() -> { -// ReactiveTransactionAttemptContext ctx = attemptContextFactory.createFrom(hydrated, overall, merged, this); -// ctx.LOGGER.info("starting attempt %d/%s/%s", -// overall.numAttempts(), ctx.transactionId(), ctx.attemptId()); -// ctx.LOGGER.info(configDebug(config, perConfig)); -// -// return initial.apply(ctx) -// -// // TXNJ-50: Make sure we run user's blocking logic on a scheduler that can take it -// .subscribeOn(Schedulers.elastic()) -// -// .onErrorResume(err -> { -// ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in deferred, rethrowing", -// err); -// -// logElidedStacktrace(ctx, err); -// -// return Mono.error(TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx)); -// }) -// -// .doOnSubscribe(v -> startTime.set(System.nanoTime())) -// -// .doOnNext(v -> { -// ctx.LOGGER.trace(ctx.attemptId(), "finished attempt %d in %sms", -// overall.numAttempts(), (System.nanoTime() - startTime.get()) / 1_000_000); -// }) -// -// .thenReturn(ctx); -// }); -// -// return executeTransaction(merged, overall, ob) -// .doOnNext(v -> overall.span().attribute(DB_COUCHBASE_TRANSACTIONS + "retries", overall.numAttempts()).finish()) -// .doOnError(err -> overall.span().attribute(DB_COUCHBASE_TRANSACTIONS + "retries", overall.numAttempts()).failWith(err)); -// } -// }); -// } -// -// Mono runBlocking(Consumer txnLogic, PerTransactionConfig perConfig) { -// return Mono.defer(() -> { -// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig)); -// TransactionContext overall = -// new TransactionContext(cleanup.clusterData().cluster().environment().requestTracer(), -// cleanup.clusterData().cluster().environment().eventBus(), -// UUID.randomUUID().toString(), -// now(), -// Duration.ZERO, -// merged); -// AtomicReference startTime = new AtomicReference<>(0L); -// overall.LOGGER.info(configDebug(config, perConfig)); -// -// Mono ob = Mono.defer(() -> { -// String txnId = UUID.randomUUID().toString(); -// ReactiveTransactionAttemptContext ctx = createAttemptContext(overall, merged, txnId); -// TransactionAttemptContext ctxBlocking = new TransactionAttemptContext(ctx); -// ctx.LOGGER.info("starting attempt %d/%s/%s", -// overall.numAttempts(), ctx.transactionId(), ctx.attemptId()); -// -// return Mono.fromRunnable(() -> txnLogic.accept(ctxBlocking)) -// -// // TXNJ-50: Make sure we run user's blocking logic on a scheduler that can take it -// .subscribeOn(Schedulers.elastic()) -// -// .onErrorResume(err -> { -// ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in runBlocking, rethrowing", err); -// -// logElidedStacktrace(ctx, err); -// -// return Mono.error(TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx)); -// }) -// -// .doOnSubscribe(v -> startTime.set(System.nanoTime())) -// -// .doOnNext(v -> { -// ctx.LOGGER.trace(ctx.attemptId(), "finished attempt %d in %sms", -// overall.numAttempts(), (System.nanoTime() - startTime.get()) / 1_000_000); -// }) -// -// .thenReturn(ctx); -// }); -// -// return executeTransaction(merged, overall, ob) -// .doOnNext(v -> overall.span().attribute(DB_COUCHBASE_TRANSACTIONS + "retries", overall.numAttempts()).finish()) -// .doOnError(err -> overall.span().attribute(DB_COUCHBASE_TRANSACTIONS + "retries", overall.numAttempts()).failWith(err)); -// }); -// } -// -// public TransactionConfig config() { -// return config; -// } -// -// private static Duration now() { -// return Duration.of(System.nanoTime(), ChronoUnit.NANOS); -// } -// -// TransactionsCleanup cleanup() { -// return cleanup; -// } -// -// private void addCleanupRequestForContext(ReactiveTransactionAttemptContext ctx) { -// // Whether the txn was successful or not, still want to clean it up -// if (ctx.queryMode()) { -// ctx.LOGGER.info(ctx.attemptId(), "Skipping cleanup request as in query mode"); -// } -// else if (ctx.serialized().isPresent()) { -// ctx.LOGGER.info(ctx.attemptId(), "Skipping cleanup request as deferred transaction"); -// } -// else if (ctx.atrId().isPresent() && ctx.atrCollection().isPresent()) { -// switch (ctx.state()) { -// case NOT_STARTED: -// case COMPLETED: -// case ROLLED_BACK: -// ctx.LOGGER.trace(ctx.attemptId(), "Skipping addition of cleanup request in state %s", ctx.state()); -// break; -// default: -// ctx.LOGGER.trace(ctx.attemptId(), "Adding cleanup request for %s/%s", -// ctx.atrCollection().get().name(), ctx.atrId().get()); -// -// cleanup.add(ctx.createCleanupRequest()); -// } -// } else { -// // No ATR entry to remove -// ctx.LOGGER.trace(ctx.attemptId(), "Skipping cleanup request as no ATR entry to remove (due to no " + -// "mutations)"); -// } -// } -// -// private static TransactionResult createResultFromContext(TransactionContext overall) { -// return new TransactionResult(overall.attempts(), -// overall.LOGGER, -// Duration.of(System.nanoTime() - overall.startTimeClient().toNanos(), ChronoUnit.NANOS), -// overall.transactionId(), -// overall.serialized()); -// } -// -// /** -// * Performs a single query transaction, with default configuration. -// * -// * @param statement the statement to execute. -// * @return a ReactiveSingleQueryTransactionResult -// */ -// @Stability.Uncommitted -// public Mono query(String statement) { -// return query(null, statement, SingleQueryTransactionConfigBuilder.create().build()); -// } -// -// /** -// * Performs a single query transaction, with a custom configuration. -// * -// * @param statement the statement to execute. -// * @param queryOptions configuration options. -// * @return a ReactiveSingleQueryTransactionResult -// */ -// @Stability.Uncommitted -// public Mono query(String statement, SingleQueryTransactionConfig queryOptions) { -// return query(null, statement, queryOptions); -// } -// -// /** -// * Performs a single query transaction, with a scope context and default configuration. -// * -// * @param statement the statement to execute. -// * @param scope the query will be executed in the context of this scope, so it can refer to a collection on this scope -// * rather than needed to provide the full keyspace. -// * @return a ReactiveSingleQueryTransactionResult -// */ -// @Stability.Uncommitted -// public Mono query(ReactiveScope scope, String statement) { -// return query(scope, statement, SingleQueryTransactionConfigBuilder.create().build()); -// } -// -// /** -// * Performs a single query transaction, with a scope context and custom configuration. -// * -// * @param statement the statement to execute. -// * @param scope the query will be executed in the context of this scope, so it can refer to a collection on this scope -// * rather than needed to provide the full keyspace. -// * @param queryOptions configuration options. -// * @return a ReactiveSingleQueryTransactionResult -// */ -// @Stability.Uncommitted -// public Mono query(ReactiveScope scope, String statement, SingleQueryTransactionConfig queryOptions) { -// return Mono.defer(() -> { -// AtomicReference queryResult = new AtomicReference<>(); -// return run((ctx) -> ctx.query(scope, statement, queryOptions.queryOptions(), true) -// .doOnNext(qr -> queryResult.set(qr)) -// .then(), queryOptions.convert()) -// .map(result -> new ReactiveSingleQueryTransactionResult(result.log(), queryResult.get())); -// }); -// } -// -// @Stability.Internal -// @Deprecated // Prefer setting TransactionConfigBuilder#testFactories now -// public void setAttemptContextFactory(AttemptContextFactory factory) { -// this.attemptContextFactory = factory; -// } -// public ReactiveTransactionAttemptContext newAttemptContextReactive(){ -// PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build(); -// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig)); -// -// TransactionContext overall = new TransactionContext( -// cleanup().clusterData().cluster().environment().requestTracer(), -// cleanup().clusterData().cluster().environment().eventBus(), -// UUID.randomUUID().toString(), now(), Duration.ZERO, merged); -// -// String txnId = UUID.randomUUID().toString(); -// overall.LOGGER.info(configDebug(config, perConfig)); -// return createAttemptContext(overall, merged, txnId); -// } -// -//} diff --git a/src/main/java/com/example/demo/CouchbaseTransactionManager.pre-core b/src/main/java/com/example/demo/CouchbaseTransactionManager.pre-core deleted file mode 100644 index ab9d84087..000000000 --- a/src/main/java/com/example/demo/CouchbaseTransactionManager.pre-core +++ /dev/null @@ -1,201 +0,0 @@ -package com.example.demo; - -import java.util.concurrent.atomic.AtomicReference; - -import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext; -import com.couchbase.client.java.transactions.TransactionAttemptContext; -import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.data.couchbase.core.CouchbaseTemplate; -import org.springframework.data.couchbase.transaction.ClientSession; -import org.springframework.data.couchbase.transaction.ClientSessionImpl; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.support.AbstractPlatformTransactionManager; -import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager; -import org.springframework.transaction.support.DefaultTransactionStatus; -import org.springframework.transaction.support.ResourceHolderSupport; -import org.springframework.transaction.support.ResourceTransactionManager; -import org.springframework.transaction.support.SmartTransactionObject; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionSynchronizationManager; -import org.springframework.transaction.support.TransactionSynchronizationUtils; -import org.springframework.util.Assert; - -// todo gp why is there separate CouchbaseCallbackTransactionManager if this class also extends CallbackPreferringPlatformTransactionManager? -// todo gp there is another CouchbaseTransactionManager in another package, which is valid? -public class CouchbaseTransactionManager extends AbstractPlatformTransactionManager - implements DisposableBean, ResourceTransactionManager, CallbackPreferringPlatformTransactionManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTransactionManager.class); - - private final CouchbaseTemplate template; - - public CouchbaseTransactionManager(CouchbaseTemplate template) { - this.template = template; - } - - public CouchbaseTransactionalTemplate template() { - return new CouchbaseTransactionalTemplate(template); - } - - @Override - public T execute(TransactionDefinition definition, TransactionCallback callback) throws TransactionException { - final AtomicReference result = new AtomicReference<>(); - // todo gp like CouchbaseCallbackTransactionManager, it needs access to CouchbaseClientFactory here (Cluster) -// TransactionResult txnResult = transactions.run(attemptContext -> { -// -// if (TransactionSynchronizationManager.hasResource(template.getCouchbaseClientFactory())) { -// ((CouchbaseResourceHolder) TransactionSynchronizationManager -// .getResource(template.reactive().getCouchbaseClientFactory())) -// .setAttemptContext(attemptContext); -// } else { -// TransactionSynchronizationManager.bindResource( -// template.reactive().getCouchbaseClientFactory(), -// new CouchbaseResourceHolder(attemptContext) -// ); -// } -// -// try { -// // Since we are on a different thread now transparently, at least make sure -// // that the original method invocation is synchronized. -// synchronized (this) { -// result.set(callback.doInTransaction(null)); -// } -// } catch (RuntimeException e) { -// System.err.println("RuntimeException: "+e+" instanceof RuntimeException "+(e instanceof RuntimeException)); -// throw e; -// } catch (Throwable e) { -// System.err.println("RuntimeException: "+e+" instanceof "+(e instanceof Throwable)); -// throw new RuntimeException(e); -// } -// }); - -// LOGGER.debug("Completed Couchbase Transaction with Result: " + txnResult); - return result.get(); - } - - @Override - protected CouchbaseTransactionObject doGetTransaction() throws TransactionException { - CouchbaseResourceHolder resourceHolder = (CouchbaseResourceHolder) TransactionSynchronizationManager - .getResource(template.getCouchbaseClientFactory()); - return new CouchbaseTransactionObject(resourceHolder); - } - - @Override - protected boolean isExistingTransaction(Object transaction) throws TransactionException { - return extractTransaction(transaction).hasResourceHolder(); - } - - @Override - protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { - LOGGER.debug("Beginning Couchbase Transaction with Definition {}", definition); - } - - @Override - protected void doCommit(DefaultTransactionStatus status) throws TransactionException { - LOGGER.debug("Committing Couchbase Transaction with status {}", status); - } - - @Override - protected void doRollback(DefaultTransactionStatus status) throws TransactionException { - LOGGER.warn("Rolling back Couchbase Transaction with status {}", status); - } - - @Override - protected void doCleanupAfterCompletion(Object transaction) { - LOGGER.trace("Performing cleanup of Couchbase Transaction {}", transaction); - } - - @Override - public void destroy() { - } - - @Override - public Object getResourceFactory() { - return template.getCouchbaseClientFactory(); - } - - private static CouchbaseTransactionObject extractTransaction(Object transaction) { - Assert.isInstanceOf(CouchbaseTransactionObject.class, transaction, - () -> String.format("Expected to find a %s but it turned out to be %s.", CouchbaseTransactionObject.class, - transaction.getClass())); - - return (CouchbaseTransactionObject) transaction; - } - - public static class CouchbaseResourceHolder extends ResourceHolderSupport { - - private volatile TransactionAttemptContext attemptContext; - private volatile ReactiveTransactionAttemptContext attemptContextReactive; - private volatile ClientSession session = new ClientSessionImpl(); - - public CouchbaseResourceHolder(TransactionAttemptContext attemptContext) { - this.attemptContext = attemptContext; - } - - public TransactionAttemptContext getAttemptContext() { - return attemptContext; - } - - public void setAttemptContext(TransactionAttemptContext attemptContext) { - this.attemptContext = attemptContext; - } - - public ReactiveTransactionAttemptContext getAttemptContextReactive() { - return attemptContext!= null ? AttemptContextReactiveAccessor.getACR(attemptContext) : attemptContextReactive; - } - public void setAttemptContextReactive(ReactiveTransactionAttemptContext attemptContextReactive) { - this.attemptContextReactive = attemptContextReactive; - } - - public ClientSession getSession() { - return session; - } - - public void setSession(ClientSession session){ - this.session = session; - } - - @Override - public String toString() { - return "CouchbaseResourceHolder{" + - "attemptContext=" + attemptContext + - '}'; - } - - } - - protected static class CouchbaseTransactionObject implements SmartTransactionObject { - - final CouchbaseResourceHolder resourceHolder; - - CouchbaseTransactionObject(CouchbaseResourceHolder resourceHolderIn) { - resourceHolder = resourceHolderIn; - } - - @Override - public boolean isRollbackOnly() { - return resourceHolder != null && resourceHolder.isRollbackOnly(); - } - - @Override - public void flush() { - TransactionSynchronizationUtils.triggerFlush(); - } - - public boolean hasResourceHolder() { - return resourceHolder != null; - } - - @Override - public String toString() { - return "CouchbaseTransactionObject{" + - "resourceHolder=" + resourceHolder + - '}'; - } - } - -} diff --git a/src/main/java/com/example/demo/CouchbaseTransactionalTemplate.java b/src/main/java/com/example/demo/CouchbaseTransactionalTemplate.java deleted file mode 100644 index c0098a9d8..000000000 --- a/src/main/java/com/example/demo/CouchbaseTransactionalTemplate.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.example.demo; - -import com.couchbase.client.core.transaction.CoreTransactionAttemptContext; -import com.couchbase.client.core.transaction.CoreTransactionGetResult; -import com.couchbase.client.java.codec.Transcoder; -import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext; -import com.couchbase.client.java.transactions.TransactionAttemptContext; -import com.couchbase.client.java.transactions.TransactionGetResult; -import org.springframework.data.couchbase.core.CouchbaseTemplate; -import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; -import org.springframework.data.couchbase.transaction.ReactiveCouchbaseResourceHolder; -import org.springframework.transaction.support.TransactionSynchronizationManager; - -import static com.couchbase.client.java.transactions.internal.ConverterUtil.makeCollectionIdentifier; - -public class CouchbaseTransactionalTemplate { - - private final CouchbaseTemplate template; - - public CouchbaseTransactionalTemplate(CouchbaseTemplate template) { - this.template = template; - } - - public SpringTransactionGetResult findById(String id, Class domainType) { - try { - CoreTransactionAttemptContext ctx = getContext(); - CoreTransactionGetResult getResult = ctx.get( makeCollectionIdentifier(template.getCouchbaseClientFactory().getDefaultCollection().async()) , id).block(); - - T t = template.support().decodeEntity(id, new String(getResult.contentAsBytes()), getResult.cas(), domainType, - null, null, null); - return new SpringTransactionGetResult<>(t, getResult); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - - } - - public void replaceById(CoreTransactionGetResult getResult, T entity) { - CoreTransactionAttemptContext ctx = getContext(); - Transcoder transCoder = template.getCouchbaseClientFactory().getCluster().environment().transcoder(); - Transcoder.EncodedValue encoded = transCoder.encode(template.support().encodeEntity(entity).export()); - ctx.replace(getResult, encoded.encoded()); - } - - private CoreTransactionAttemptContext getContext() { - ReactiveCouchbaseResourceHolder resource = (ReactiveCouchbaseResourceHolder) TransactionSynchronizationManager - .getResource(template.getCouchbaseClientFactory()); - CoreTransactionAttemptContext atr; - if (resource != null) { - atr = resource.getCore(); - } else { - ReactiveCouchbaseResourceHolder holder = (ReactiveCouchbaseResourceHolder) TransactionSynchronizationManager - .getResource(template.getCouchbaseClientFactory().getCluster()); - atr = holder.getCore(); - } - return atr; - } - - - public static ReactiveCouchbaseResourceHolder getSession(ReactiveCouchbaseTemplate template) { - ReactiveCouchbaseResourceHolder resource = (ReactiveCouchbaseResourceHolder) TransactionSynchronizationManager - .getResource(template.getCouchbaseClientFactory()); - return resource; - } - -} diff --git a/src/main/java/com/example/demo/SpringTransactionGetResult.java b/src/main/java/com/example/demo/SpringTransactionGetResult.java deleted file mode 100644 index 27ede4aaf..000000000 --- a/src/main/java/com/example/demo/SpringTransactionGetResult.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.example.demo; - -import com.couchbase.client.core.transaction.CoreTransactionGetResult; -import com.couchbase.client.java.transactions.TransactionGetResult; - -public class SpringTransactionGetResult { - - private final T value; - private final CoreTransactionGetResult inner; - - public SpringTransactionGetResult(T value, CoreTransactionGetResult inner) { - this.value = value; - this.inner = inner; - } - - public T getValue() { - return value; - } - - public CoreTransactionGetResult getInner() { - return inner; - } - - @Override - public String toString() { - return "SpringTransactionGetResult{" + - "value=" + value + - ", inner=" + inner + - '}'; - } -} diff --git a/src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java b/src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java index 70a6c9227..4f82a74af 100644 --- a/src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java +++ b/src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java @@ -78,8 +78,4 @@ public interface CouchbaseClientFactory extends Closeable { CoreTransactionAttemptContext getCore(TransactionOptions options, CoreTransactionAttemptContext atr); - - //CouchbaseClientFactory with(CouchbaseTransactionalOperator txOp); - - //CouchbaseTransactionalOperator getTransactionalOperator(); } diff --git a/src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java b/src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java index ca2be1c05..d89ebf675 100644 --- a/src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java +++ b/src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java @@ -103,11 +103,5 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider* */ ReactiveCouchbaseClientFactory with(CouchbaseTransactionalOperator txOp); - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#isTransactionActive() - */ - boolean isTransactionActive(); - CouchbaseTransactionalOperator getTransactionalOperator(); } diff --git a/src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java b/src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java index f6d702447..4f10af8b9 100644 --- a/src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java +++ b/src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java @@ -59,10 +59,7 @@ public SimpleCouchbaseClientFactory(final String connectionString, final Authent public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator, final String bucketName, final String scopeName) { - this(new OwnedSupplier<>(Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator) - // todo gp disabling cleanupLostAttempts to simplify output during development - .environment(env -> env.transactionsConfig( - TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)))))), + this(new OwnedSupplier<>(Cluster.connect(connectionString, ClusterOptions.clusterOptions(authenticator))), bucketName, scopeName); } diff --git a/src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java b/src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java index 7a3402a10..bcd670fed 100644 --- a/src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java +++ b/src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java @@ -145,11 +145,6 @@ public ReactiveCouchbaseClientFactory withCore(ReactiveCouchbaseResourceHolder h return new CoreTransactionAttemptContextBoundCouchbaseClientFactory(holder, this, transactions); } - @Override - public boolean isTransactionActive() { - return false; - } - @Override public CouchbaseTransactionalOperator getTransactionalOperator() { return transactionalOperator; @@ -200,7 +195,6 @@ static final class CoreTransactionAttemptContextBoundCouchbaseClientFactory this.delegate = delegate; } - @Override public ClusterInterface getCluster() throws DataAccessException { return decorateDatabase(delegate.getCluster()); @@ -221,7 +215,6 @@ public Scope getScope(String scopeName) { return delegate.getScope(scopeName); } - @Override public Scope getScope() { return delegate.getScope(); } @@ -281,15 +274,6 @@ public ReactiveCouchbaseClientFactory withCore(ReactiveCouchbaseResourceHolder c return delegate.withCore(core); } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#isTransactionActive() - */ - @Override - public boolean isTransactionActive() { - return transactionResources != null && transactionResources.hasActiveTransaction(); - } - @Override public CouchbaseTransactionalOperator getTransactionalOperator() { return delegate.getTransactionalOperator(); diff --git a/src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java b/src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java index 80f4e72b9..b98dc2d46 100644 --- a/src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java +++ b/src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java @@ -18,7 +18,6 @@ import static com.couchbase.client.java.ClusterOptions.clusterOptions; import static org.springframework.data.couchbase.config.BeanNames.COUCHBASE_MAPPING_CONTEXT; -import static org.springframework.data.couchbase.config.BeanNames.COUCHBASE_TRANSACTIONS; import java.time.Duration; import java.util.Collections; @@ -168,9 +167,6 @@ public ClusterEnvironment couchbaseClusterEnvironment() { throw new CouchbaseException("non-shadowed Jackson not present"); } builder.jsonSerializer(JacksonJsonSerializer.create(couchbaseObjectMapper())); - // todo gp only suitable for tests - TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false)); - builder.transactionsConfig(transactionsConfig()); configureEnvironment(builder); return builder.build(); } @@ -378,11 +374,6 @@ public TransactionOptions transactionsOptions(){ return TransactionOptions.transactionOptions(); } - // todo gpx transactions config is now done in standard ClusterConfig - so I think we don't want a separate bean? - public TransactionsConfig.Builder transactionsConfig(){ - return TransactionsConfig.builder().durabilityLevel(DurabilityLevel.NONE).timeout(Duration.ofMinutes(20));// for testing - } - /** * Blocking Transaction Manager * diff --git a/src/main/java/org/springframework/data/couchbase/config/BeanNames.java b/src/main/java/org/springframework/data/couchbase/config/BeanNames.java index 100c841e5..41bccb95a 100644 --- a/src/main/java/org/springframework/data/couchbase/config/BeanNames.java +++ b/src/main/java/org/springframework/data/couchbase/config/BeanNames.java @@ -34,8 +34,6 @@ public class BeanNames { public static final String COUCHBASE_CUSTOM_CONVERSIONS = "couchbaseCustomConversions"; - public static final String COUCHBASE_TRANSACTIONS = "couchbaseTransactions"; - /** * The name for the bean that stores custom mapping between repositories and their backing couchbaseOperations. */ diff --git a/src/main/java/org/springframework/data/couchbase/core/AbstractTemplateSupport.java b/src/main/java/org/springframework/data/couchbase/core/AbstractTemplateSupport.java index 8164f2cd2..685d7a8a0 100644 --- a/src/main/java/org/springframework/data/couchbase/core/AbstractTemplateSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/AbstractTemplateSupport.java @@ -192,16 +192,6 @@ ConvertingPropertyAccessor getPropertyAccessor(final T source) { return new ConvertingPropertyAccessor<>(accessor, converter.getConversionService()); } - public Integer getTxResultKey(T source) { - final CouchbasePersistentEntity persistentEntity = mappingContext.getRequiredPersistentEntity(source.getClass()); - final CouchbasePersistentProperty transactionResultProperty = persistentEntity.transactionResultProperty(); - if (transactionResultProperty == null) { - throw new CouchbaseException("the entity class " + source.getClass() - + " does not have a property required for transactions:\n\t@TransactionResult TransactionResultHolder txResultHolder"); - } - return getPropertyAccessor(source).getProperty(transactionResultProperty, Integer.class); - } - public void maybeEmitEvent(CouchbaseMappingEvent event) { if (canPublishEvent()) { try { diff --git a/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java index 322bf4f73..ac2c1c675 100644 --- a/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java +++ b/src/main/java/org/springframework/data/couchbase/core/CouchbaseTemplate.java @@ -56,7 +56,6 @@ public class CouchbaseTemplate implements CouchbaseOperations, ApplicationContex private final ReactiveCouchbaseTemplate reactiveCouchbaseTemplate; private final QueryScanConsistency scanConsistency; private @Nullable CouchbasePersistentEntityIndexCreator indexCreator; - private CouchbaseTransactionalOperator couchbaseTransactionalOperator; public CouchbaseTemplate(final CouchbaseClientFactory clientFactory, final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory, final CouchbaseConverter converter) { @@ -233,30 +232,4 @@ private void prepareIndexCreator(final ApplicationContext context) { public TemplateSupport support() { return templateSupport; } - - public CouchbaseTemplate with(CouchbaseTransactionalOperator couchbaseTransactionalOperator) { - this.couchbaseTransactionalOperator = couchbaseTransactionalOperator; - return this; - } - - /** - * Get the TransactionalOperator from
- * 1. The template.clientFactory
- * 2. The template.threadLocal
- * 3. otherwise null
- * This can be overriden in the operation method by
- * 1. repository.withCollection() - *//* - private CouchbaseStuffHandle getTransactionalOperator() { - if (this.getCouchbaseClientFactory().getTransactionalOperator() != null) { - return this.getCouchbaseClientFactory().getTransactionalOperator(); - } - ReactiveCouchbaseTemplate t = this.reactive(); - PseudoArgs pArgs = t.getPseudoArgs(); - if (pArgs != null && pArgs.getTxOp() != null) { - return pArgs.getTxOp(); - } - return null; - } - */ } diff --git a/src/main/java/org/springframework/data/couchbase/core/NonReactiveSupportWrapper.java b/src/main/java/org/springframework/data/couchbase/core/NonReactiveSupportWrapper.java index 2020a0d43..7b2c9dd4e 100644 --- a/src/main/java/org/springframework/data/couchbase/core/NonReactiveSupportWrapper.java +++ b/src/main/java/org/springframework/data/couchbase/core/NonReactiveSupportWrapper.java @@ -81,11 +81,6 @@ public String getJavaNameForEntity(Class clazz) { return support.getJavaNameForEntity(clazz); } - @Override - public Integer getTxResultHolder(T source) { - return support.getTxResultHolder(source); - } - @Override public TranslationService getTranslationService() { return support.getTranslationService(); diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java index 98a5c6546..c9750db4a 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java @@ -43,8 +43,6 @@ import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; -import com.couchbase.client.core.transaction.CoreTransactionAttemptContext; -import com.couchbase.client.java.ClusterInterface; import com.couchbase.client.java.Collection; import com.couchbase.client.java.query.QueryScanConsistency; @@ -254,72 +252,12 @@ public QueryScanConsistency getConsistency() { return scanConsistency; } - protected Mono doGetDatabase() { - return ReactiveCouchbaseClientUtils.getDatabase(clientFactory, SessionSynchronization.ON_ACTUAL_TRANSACTION); - } protected Mono doGetTemplate() { return ReactiveCouchbaseClientUtils.getTemplate(clientFactory, SessionSynchronization.ON_ACTUAL_TRANSACTION, this.getConverter()); } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#withSession(com.mongodb.session.ClientSession) - */ - public ReactiveCouchbaseOperations withResources(ReactiveCouchbaseResourceHolder resources) { - return new ReactiveResourcesBoundCouchbaseTemplate(resources, ReactiveCouchbaseTemplate.this); - } - - /** - * {@link CouchbaseTemplate} extension bound to a specific {@link CoreTransactionAttemptContext} that is applied when - * interacting with the server through the driver API.
- * The prepare steps for {} and {} proxy the target and invoke the desired target method matching the actual arguments - * plus a {@link CoreTransactionAttemptContext}. - * - * @author Christoph Strobl - * @since 2.1 - */ - static class ReactiveResourcesBoundCouchbaseTemplate extends ReactiveCouchbaseTemplate { - - private final ReactiveCouchbaseTemplate delegate; - private final ReactiveCouchbaseResourceHolder holder; - - /** - * @param holder must not be {@literal null}. - * @param that must not be {@literal null}. - */ - ReactiveResourcesBoundCouchbaseTemplate(ReactiveCouchbaseResourceHolder holder, ReactiveCouchbaseTemplate that) { - - super(that.clientFactory.withCore(holder), that.getConverter()); - - this.delegate = that; - this.holder = holder; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getCollection(java.lang.String) - */ - @Override - public Collection getCollection(String collectionName) { - - // native MongoDB objects that offer methods with ClientSession must not be proxied. - return delegate.getCollection(collectionName); - } - - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getMongoDatabase() - */ - @Override - public ReactiveCouchbaseClientFactory getCouchbaseClientFactory() { - - // native MongoDB objects that offer methods with ClientSession must not be proxied. - return delegate.getCouchbaseClientFactory(); - } - } - class IndexCreatorEventListener implements ApplicationListener> { final Consumer subscriptionExceptionHandler; @@ -344,27 +282,6 @@ public void onApplicationEvent(MappingContextEvent event) { } } - /** - * Get the TransactionalOperator from
- * 1. The template.clientFactory
- * 2. The template.threadLocal
- * 3. otherwise null
- * This can be overriden in the operation method by
- * 1. repository.withCollection() - */ - /* - private CouchbaseStuffHandle getTransactionalOperator() { - if (this.getCouchbaseClientFactory().getTransactionalOperator() != null) { - return this.getCouchbaseClientFactory().getTransactionalOperator(); - } - ReactiveCouchbaseTemplate t = this; - PseudoArgs pArgs = t.getPseudoArgs(); - if (pArgs != null && pArgs.getTxOp() != null) { - return pArgs.getTxOp(); - } - return null; - } - */ /** * Value object chaining together a given source document with its mapped representation and the collection to persist * it to. diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateSupport.java index 7d73839c6..cb372aaf8 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplateSupport.java @@ -93,10 +93,6 @@ public Mono applyResult(T entity, CouchbaseDocument converted, Object id, return Mono.fromSupplier(() -> applyResultBase(entity, converted, id, cas, txResultHolder, holder)); } - @Override - public Integer getTxResultHolder(T source) { - return null; - } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java index 8429c359f..7887d0b35 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveInsertByIdOperationSupport.java @@ -190,8 +190,6 @@ public InsertByIdInScope withDurability(final DurabilityLevel durabilityLevel durabilityLevel, expiry, txCtx, support); } - // todo gpx need to figure out how to handle options re transactions. E.g. many non-transactional insert options, - // like this, aren't supported @Override public InsertByIdInScope withDurability(final PersistTo persistTo, final ReplicateTo replicateTo) { Assert.notNull(persistTo, "PersistTo must not be null."); diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveRemoveByIdOperationSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveRemoveByIdOperationSupport.java index 5d71a5799..b7e1cb8e6 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveRemoveByIdOperationSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveRemoveByIdOperationSupport.java @@ -94,9 +94,7 @@ public Mono one(final String id) { ReactiveCollection rc = clientFactory.withScope(pArgs.getScope()).getCollection(pArgs.getCollection()) .reactive(); Mono tmpl = template.doGetTemplate(); - final Mono removeResult; - // todo gpx convert to TransactionalSupport Mono allResult = tmpl.flatMap(tp -> tp.getCouchbaseClientFactory().getResourceHolderMono().flatMap(s -> { if (s.getCore() == null) { System.err.println("non-tx remove"); diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveSessionCallback.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveSessionCallback.java deleted file mode 100644 index c91caa162..000000000 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveSessionCallback.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.springframework.data.couchbase.core; -/* - * Copyright 2018-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.reactivestreams.Publisher; - -/** - * Callback interface for executing operations within a {@link com.mongodb.reactivestreams.client.ClientSession} using - * reactive infrastructure. - * - * @author Christoph Strobl - * @since 2.1 - * @see com.mongodb.reactivestreams.client.ClientSession - */ -@FunctionalInterface -public interface ReactiveSessionCallback { - - /** - * Execute operations against a MongoDB instance via session bound {@link ReactiveMongoOperations}. The session is - * inferred directly into the operation so that no further interaction is necessary.
- * Please note that only Spring Data-specific abstractions like {@link ReactiveMongoOperations#find(Query, Class)} and - * others are enhanced with the {@link com.mongodb.session.ClientSession}. When obtaining plain MongoDB gateway - * objects like {@link com.mongodb.reactivestreams.client.MongoCollection} or - * {@link com.mongodb.reactivestreams.client.MongoDatabase} via eg. - * {@link ReactiveMongoOperations#getCollection(String)} we leave responsibility for - * {@link com.mongodb.session.ClientSession} again up to the caller. - * - * @param operations will never be {@literal null}. - * @return never {@literal null}. - */ - Publisher doInSession(ReactiveCouchbaseOperations operations); -} diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveTemplateSupport.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveTemplateSupport.java index 1313a646d..afde8fd6a 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveTemplateSupport.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveTemplateSupport.java @@ -47,9 +47,5 @@ Mono applyResult(T entity, CouchbaseDocument converted, Object id, Long c String getJavaNameForEntity(Class clazz); - Integer getTxResultHolder(T source); - - // Integer setTxResultHolder(T source); - TranslationService getTranslationService(); } diff --git a/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseDocument.java b/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseDocument.java index 02566f767..ead8146ed 100644 --- a/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseDocument.java +++ b/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseDocument.java @@ -56,9 +56,6 @@ public class CouchbaseDocument implements CouchbaseStorable { */ private int expiration; - // todo gp - public long version; - /** * Creates a completely empty {@link CouchbaseDocument}. */ diff --git a/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java b/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java index aacc2a4f0..748d429eb 100644 --- a/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java +++ b/src/main/java/org/springframework/data/couchbase/core/query/OptionsBuilder.java @@ -98,6 +98,7 @@ public static TransactionQueryOptions buildTransactionQueryOptions(QueryOptions TransactionQueryOptions txOptions = TransactionQueryOptions.queryOptions(); JsonObject optsJson = getQueryOpts(built); + // todo gpx is this compatible with all forms of named and positional parameters? won't be compatible with JsonSerializer. maybe can put some support into SDK for (Map.Entry entry : optsJson.toMap().entrySet()) { txOptions.raw(entry.getKey(), entry.getValue()); } diff --git a/src/main/java/org/springframework/data/couchbase/repository/TransactionMeta.java b/src/main/java/org/springframework/data/couchbase/repository/TransactionMeta.java deleted file mode 100644 index ae33ddefa..000000000 --- a/src/main/java/org/springframework/data/couchbase/repository/TransactionMeta.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2021 the original author or authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.couchbase.repository; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.springframework.data.annotation.QueryAnnotation; - -/** - * Indicates the field should hold key to lookup the TransactionGetResult and should NOT be considered part of the - * document. - * - * @author Michael Reiche - */ -@Retention(RetentionPolicy.RUNTIME) -@Target({ ElementType.FIELD }) -@Documented -@QueryAnnotation -public @interface TransactionMeta { - - String value() default ""; - -} diff --git a/src/main/java/org/springframework/data/couchbase/repository/TransactionResult.java b/src/main/java/org/springframework/data/couchbase/repository/TransactionResult.java index d2236d520..5413ac5e9 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/TransactionResult.java +++ b/src/main/java/org/springframework/data/couchbase/repository/TransactionResult.java @@ -28,7 +28,7 @@ * * @author Michael Reiche */ -// todo gp can we give this a different name since there is an existing TransactionResult +// todo gp I think this is no longer used - can we remove? @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.FIELD }) @Documented diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java deleted file mode 100644 index 19ae0a921..000000000 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java +++ /dev/null @@ -1,296 +0,0 @@ -///* -// * Copyright 2021 the original author or authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.springframework.data.couchbase.transaction; -// -//import com.couchbase.client.core.error.transaction.TransactionOperationFailedException; -//import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext; -//import com.couchbase.client.java.transactions.TransactionResult; -//import reactor.core.publisher.Mono; -// -//import java.time.Duration; -//import java.time.temporal.ChronoUnit; -//import java.util.concurrent.atomic.AtomicReference; -// -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.springframework.beans.factory.DisposableBean; -//import org.springframework.data.couchbase.CouchbaseClientFactory; -//import org.springframework.data.couchbase.ReactiveCouchbaseClientFactory; -//import org.springframework.data.couchbase.core.CouchbaseTemplate; -//import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; -//import org.springframework.transaction.TransactionDefinition; -//import org.springframework.transaction.TransactionException; -//import org.springframework.transaction.reactive.TransactionContextManager; -//import org.springframework.transaction.reactive.TransactionSynchronizationManager; -//import org.springframework.transaction.support.AbstractPlatformTransactionManager; -//import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager; -//import org.springframework.transaction.support.DefaultTransactionStatus; -//import org.springframework.transaction.support.ResourceTransactionManager; -//import org.springframework.transaction.support.SmartTransactionObject; -//import org.springframework.transaction.support.TransactionCallback; -//import org.springframework.transaction.support.TransactionSynchronizationUtils; -//import org.springframework.util.Assert; -// -///** -// * Blocking TransactionManager -// * -// * @author Michael Nitschinger -// * @author Michael Reiche -// */ -// -//public class CouchbaseCallbackTransactionManager extends AbstractPlatformTransactionManager -// implements DisposableBean, ResourceTransactionManager, CallbackPreferringPlatformTransactionManager { -// -// private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTransactionManager.class); -// -// private final CouchbaseTemplate template; -// private final ReactiveCouchbaseTemplate reactiveTemplate; -// private final ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory; -// private final CouchbaseClientFactory couchbaseClientFactory; -// -// private ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject transaction; -// -// public CouchbaseCallbackTransactionManager(CouchbaseTemplate template, ReactiveCouchbaseTemplate reactiveTemplate) { -// this.template = template; -// this.reactiveTemplate = reactiveTemplate; -// this.reactiveCouchbaseClientFactory = this.reactiveTemplate.getCouchbaseClientFactory(); -// this.couchbaseClientFactory = this.template.getCouchbaseClientFactory(); -// } -// -// public ReactiveCouchbaseTemplate template() { -// return reactiveTemplate; -// } -// -// private CouchbaseResourceHolder newResourceHolder(TransactionDefinition definition, ClientSessionOptions options, -// ReactiveTransactionAttemptContext atr) { -// -// CouchbaseClientFactory databaseFactory = template.getCouchbaseClientFactory(); -// -// CouchbaseResourceHolder resourceHolder = new CouchbaseResourceHolder( -// databaseFactory.getSession(options, atr), databaseFactory); -// return resourceHolder; -// } -// -// @Override -// public T execute(TransactionDefinition definition, TransactionCallback callback) throws TransactionException { -// final AtomicReference execResult = new AtomicReference<>(); -// AtomicReference startTime = new AtomicReference<>(0L); -// -// Mono txnResult = couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> { -// /* begin spring-data-couchbase transaction 1/2 */ -// ClientSession clientSession = reactiveCouchbaseClientFactory // couchbaseClientFactory -// .getSession(ClientSessionOptions.builder().causallyConsistent(true).build()) -// .block(); -// ReactiveCouchbaseResourceHolder reactiveResourceHolder = new ReactiveCouchbaseResourceHolder(clientSession, -// reactiveCouchbaseClientFactory); -// -// CouchbaseResourceHolder resourceHolder = new CouchbaseResourceHolder(clientSession, -// template.getCouchbaseClientFactory()); -// -// Mono sync = TransactionContextManager.currentContext() -// .map(TransactionSynchronizationManager::new) -// . flatMap(synchronizationManager -> { -// System.err.println("CallbackTransactionManager: " + this); -// System.err.println("bindResource: " + reactiveCouchbaseClientFactory.getCluster().block()); -// // todo gp not sure why we bind, unbind, bind again? -// // todo msr - to avoid the NotBound exception on unbind. Should use unbindIfPossible. -// synchronizationManager.bindResource(reactiveCouchbaseClientFactory.getCluster().block(), -// reactiveResourceHolder); -// org.springframework.transaction.support.TransactionSynchronizationManager -// .unbindResourceIfPossible(reactiveCouchbaseClientFactory.getCluster().block()); -// org.springframework.transaction.support.TransactionSynchronizationManager -// .bindResource(reactiveCouchbaseClientFactory.getCluster().block(), resourceHolder); -// ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject transaction = new ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject( -// reactiveResourceHolder); -// setTransaction(transaction); -// -// // todo gp experimenting with replacing the ClientSession, the ReactiveCouchbaseTransactionObject, -// // the resource holders etc., with just storing the TransactionAttemptContext. -// synchronizationManager.bindResource(ReactiveTransactionAttemptContext.class, ctx); -// -// /* end spring-data-couchbase transaction 1/2 */ -// -// // todo gp do we need TransactionSynchronizationManager.forCurrentTransaction()? as we already have synchronizationManager -// Mono result = TransactionSynchronizationManager.forCurrentTransaction().flatMap((sm) -> { -// // todo gp not sure why re-binding again? -// sm.unbindResourceIfPossible(reactiveCouchbaseClientFactory.getCluster().block()); -// sm.bindResource(reactiveCouchbaseClientFactory.getCluster().block(), -// reactiveResourceHolder); -// CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(transaction, true, false, false, true, null, sm); -// prepareSynchronization(status, new CouchbaseTransactionDefinition()); -// // System.err.println("deferContextual.ctx : " + xxx); -// //Mono cxView = Mono.deferContextual(cx -> { System.err.println("CallbackTransactionManager.cx: "+cx); return Mono.just(cx);}); -// try { -// // Since we are on a different thread now transparently, at least make sure -// // that the original method invocation is synchronized. -// synchronized (this) { -// // todo gp this will execute the lambda, and so we likely don't want that to be inside a synchronized block -// execResult.set(callback.doInTransaction(status)); -// } -// } catch (RuntimeException e) { -// throw e; -// } catch (Throwable e) { -// throw new RuntimeException(e); -// } -// return Mono.empty(); -// }).contextWrite(TransactionContextManager.getOrCreateContext()) // this doesn't create a context on the desired publisher -// .contextWrite(TransactionContextManager.getOrCreateContextHolder()).then(); -// -// // todo gp this isn't part of the chain (no `result = result.onErrorResume...`) so isn't called -// // and presumably isn't needed? -//// result.onErrorResume(err -> { -//// AttemptContextReactiveAccessor.getLogger(ctx).info(ctx.attemptId(), -//// "caught exception '%s' in async, rethrowing", err); -//// return Mono.error(ctx.TransactionOperationFailedException.convertToOperationFailedIfNeeded(err, ctx)); -//// }).thenReturn(ctx); -// -// return result.then(Mono.just(synchronizationManager)); -// }); -// /* begin spring-data-couchbase transaction 2/2 */ // this doesn't create a context on the desired publisher -// return sync.contextWrite(TransactionContextManager.getOrCreateContext()) -// .contextWrite(TransactionContextManager.getOrCreateContextHolder()).then(); -// /* end spring-data-couchbase transaction 2/2 */ -// }).doOnSubscribe(v -> startTime.set(System.nanoTime())); -// -// txnResult.block(); -// return execResult.get(); // transactions.reactive().executeTransaction(merged,overall,ob).doOnNext(v->overall.span().finish()).doOnError(err->overall.span().failWith(err));}); -// -// } -// -// private void setTransaction(ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject transaction) { -// this.transaction = transaction; -// } -// -// @Override -// protected ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject doGetTransaction() -// throws TransactionException { -// /* -// CouchbaseResourceHolder resourceHolder = (CouchbaseResourceHolder) TransactionSynchronizationManager -// .getResource(template.getCouchbaseClientFactory()); -// return new CouchbaseTransactionManager.CouchbaseTransactionObject(resourceHolder); -// */ -// return (ReactiveCouchbaseTransactionManager.ReactiveCouchbaseTransactionObject) transaction; -// } -// -// @Override -// protected boolean isExistingTransaction(Object transaction) throws TransactionException { -// return extractTransaction(transaction).hasResourceHolder(); -// } -// -// @Override -// protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { -// LOGGER.debug("Beginning Couchbase Transaction with Definition {}", definition); -// } -// -// @Override -// protected void doCommit(DefaultTransactionStatus status) throws TransactionException { -// LOGGER.debug("Committing Couchbase Transaction with status {}", status); -// } -// -// @Override -// protected void doRollback(DefaultTransactionStatus status) throws TransactionException { -// LOGGER.warn("Rolling back Couchbase Transaction with status {}", status); -// org.springframework.transaction.support.TransactionSynchronizationManager -// .unbindResource(reactiveCouchbaseClientFactory); -// } -// -// @Override -// protected void doCleanupAfterCompletion(Object transaction) { -// LOGGER.trace("Performing cleanup of Couchbase Transaction {}", transaction); -// org.springframework.transaction.support.TransactionSynchronizationManager -// .unbindResource(reactiveCouchbaseClientFactory); -// return; -// } -// -// @Override -// public void destroy() { -// } -// -// @Override -// public Object getResourceFactory() { -// return reactiveTemplate.getCouchbaseClientFactory(); -// } -// -// private static CouchbaseTransactionObject extractTransaction(Object transaction) { -// Assert.isInstanceOf(CouchbaseTransactionObject.class, transaction, -// () -> String.format("Expected to find a %s but it turned out to be %s.", CouchbaseTransactionObject.class, -// transaction.getClass())); -// -// return (CouchbaseTransactionObject) transaction; -// } -// /* -// public class CouchbaseResourceHolder extends ResourceHolderSupport { -// -// private volatile ReactiveTransactionAttemptContext attemptContext; -// //private volatile TransactionResultMap resultMap = new TransactionResultMap(template); -// -// public CouchbaseResourceHolder(ReactiveTransactionAttemptContext attemptContext) { -// this.attemptContext = attemptContext; -// } -// -// public ReactiveTransactionAttemptContext getAttemptContext() { -// return attemptContext; -// } -// -// public void setAttemptContext(ReactiveTransactionAttemptContext attemptContext) { -// this.attemptContext = attemptContext; -// } -// -// //public TransactionResultMap getTxResultMap() { -// // return resultMap; -// //} -// -// @Override -// public String toString() { -// return "CouchbaseResourceHolder{" + "attemptContext=" + attemptContext + "}"; -// } -// } -// -// */ -// -// protected static class CouchbaseTransactionObject implements SmartTransactionObject { -// -// private final CouchbaseResourceHolder resourceHolder; -// -// CouchbaseTransactionObject(CouchbaseResourceHolder resourceHolder) { -// this.resourceHolder = resourceHolder; -// } -// -// @Override -// public boolean isRollbackOnly() { -// return this.resourceHolder != null && this.resourceHolder.isRollbackOnly(); -// } -// -// @Override -// public void flush() { -// TransactionSynchronizationUtils.triggerFlush(); -// } -// -// public boolean hasResourceHolder() { -// return resourceHolder != null; -// } -// -// @Override -// public String toString() { -// return "CouchbaseTransactionObject{" + "resourceHolder=" + resourceHolder + '}'; -// } -// } -// -// private static Duration now() { -// return Duration.of(System.nanoTime(), ChronoUnit.NANOS); -// } -// -//} diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolderx.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolderx.java deleted file mode 100644 index a4ba04574..000000000 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolderx.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2019-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.data.couchbase.transaction; - -import org.springframework.data.couchbase.CouchbaseClientFactory; -import org.springframework.data.couchbase.core.CouchbaseTemplate; -import org.springframework.lang.Nullable; -import org.springframework.transaction.support.ResourceHolderSupport; - -import com.couchbase.client.core.transaction.CoreTransactionAttemptContext; - -/** - * MongoDB specific resource holder, wrapping a {@link CoreTransactionAttemptContext}. - * {@link ReactiveCouchbaseTransactionManager} binds instances of this class to the subscriber context. - *

- * Note: Intended for internal usage only. - * - * @author Mark Paluch - * @author Christoph Strobl - * @since 2.2 - * @see CouchbaseTransactionManager - * @see CouchbaseTemplate - */ -// todo gp understand why this is needed - can we not just hold ctx in Mono context? -public class CouchbaseResourceHolderx extends ResourceHolderSupport { - - private @Nullable CoreTransactionAttemptContext core; // which holds the atr - private CouchbaseClientFactory databaseFactory; - - /** - * Create a new {@link org.springframework.data.couchbase.transaction.ReactiveCouchbaseResourceHolder} for a given - * {@link CoreTransactionAttemptContext session}. - * - * @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}. - * @param databaseFactory the associated {@link CouchbaseClientFactory}. must not be {@literal null}. - */ - public CouchbaseResourceHolderx(@Nullable CoreTransactionAttemptContext core, CouchbaseClientFactory databaseFactory) { - - this.core = core; - this.databaseFactory = databaseFactory; - } - - /** - * @return the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}. - */ - @Nullable - public CoreTransactionAttemptContext getCore() { - return core; - } - - /** - * @return the required associated {@link CoreTransactionAttemptContext}. - * @throws IllegalStateException if no session is associated. - */ - CoreTransactionAttemptContext getRequiredSession() { - - CoreTransactionAttemptContext session = getCore(); - - if (session == null) { - throw new IllegalStateException("No ClientSession associated"); - } - return session; - } - - /** - * @return the associated {@link CouchbaseClientFactory}. - */ - public CouchbaseClientFactory getDatabaseFactory() { - return databaseFactory; - } - - /** - * Set the {@link CoreTransactionAttemptContext} to guard. - * - * @param core can be {@literal null}. - */ - public void setCore(@Nullable CoreTransactionAttemptContext core) { - this.core = core; - } - - /** - * @return {@literal true} if session is not {@literal null}. - */ - boolean hasCore() { - return core != null; - } - - /** - * If the {@link org.springframework.data.couchbase.transaction.ReactiveCouchbaseResourceHolder} is {@link #hasCore()} - * not already associated} with a {@link CoreTransactionAttemptContext} the given value is - * {@link #setCore(CoreTransactionAttemptContext) set} and returned, otherwise the current bound session is returned. - * - * @param core - * @return - */ - @Nullable - public CoreTransactionAttemptContext setSessionIfAbsent(@Nullable CoreTransactionAttemptContext core) { - - if (!hasCore()) { - setCore(core); - } - - return core; - } - -} diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseSimpleTransactionManager.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseSimpleTransactionManager.java deleted file mode 100644 index 1947cb7a2..000000000 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseSimpleTransactionManager.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2018-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.data.couchbase.transaction; - -import org.springframework.data.couchbase.CouchbaseClientFactory; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.TransactionSystemException; - -// todo gp experimenting with the simplest possible class, extending PlatformTransactionManager not AbstractPlatformTransactionManager -public class CouchbaseSimpleTransactionManager implements PlatformTransactionManager { - - private final CouchbaseClientFactory clientFactory; - - public CouchbaseSimpleTransactionManager(CouchbaseClientFactory clientFactory) { - this.clientFactory = clientFactory; - } - - @Override - public TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { - return null; - } - - @Override - public void commit(TransactionStatus status) throws TransactionException { - // todo gp what here - do we need to re-allow explicit commit? how to handle retries of this part? - } - - @Override - public void rollback(TransactionStatus status) throws TransactionException { - // todo gp same as commit() - } -} diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionDefinition.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionDefinition.java index d46d73bc7..d879ac645 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionDefinition.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionDefinition.java @@ -1,37 +1,18 @@ package org.springframework.data.couchbase.transaction; import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext; -import com.couchbase.client.java.transactions.TransactionAttemptContext; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.support.DefaultTransactionDefinition; public class CouchbaseTransactionDefinition extends DefaultTransactionDefinition { ReactiveTransactionAttemptContext atr; - TransactionAttemptContext at; public CouchbaseTransactionDefinition(){ super(); setIsolationLevel(ISOLATION_READ_COMMITTED); } - public CouchbaseTransactionDefinition(TransactionDefinition that) { - super(that); - } - - public CouchbaseTransactionDefinition(int propagationBehavior) { - super(propagationBehavior); - } - public void setAttemptContextReactive(ReactiveTransactionAttemptContext atr){ this.atr = atr; } - - public ReactiveTransactionAttemptContext getAttemptContextReactive(){ - return atr; - } - - public void setAttemptContext(TransactionAttemptContext attemptContext) { - at = attemptContext; - } } diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionManager.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionManager.java index 3f5d7a393..f161049c3 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionManager.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionManager.java @@ -474,11 +474,6 @@ public CoreTransactionAttemptContext getCore() { return resourceHolder != null ? resourceHolder.getCore() : null; } - private ReactiveCouchbaseResourceHolder getRequiredResourceHolder() { - Assert.state(resourceHolder != null, "CouchbaseResourceHolder is required but not present. o_O"); - return resourceHolder; - } - private CoreTransactionAttemptContext getRequiredCore() { CoreTransactionAttemptContext core = getCore(); Assert.state(core != null, "A Core is required but it turned out to be null."); diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionStatus.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionStatus.java index 5331cea53..23635bd32 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionStatus.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionStatus.java @@ -5,7 +5,6 @@ public class CouchbaseTransactionStatus extends DefaultTransactionStatus { - final TransactionSynchronizationManager transactionSynchronizationManager; /** * Create a new {@code DefaultTransactionStatus} instance. * @@ -28,6 +27,5 @@ public CouchbaseTransactionStatus(Object transaction, boolean newTransaction, bo readOnly, debug, suspendedResources); - transactionSynchronizationManager = sm; } } diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionalOperator.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionalOperator.java index c64cc47a8..b31748259 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionalOperator.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionalOperator.java @@ -89,8 +89,6 @@ public Mono apply(CouchbaseTransactionalOperator couchbaseTransactionalOpe */ public Mono reactive(Function> transactionLogic, boolean commit) { -// // todo gp this needs access to a Cluster -// return Mono.empty(); return ((ReactiveCouchbaseTransactionManager) transactionManager).getDatabaseFactory().getCluster().reactive().transactions().run(ctx -> { setAttemptContextReactive(ctx); // for getTxOp().getCtx() in Reactive*OperationSupport // for transactional(), transactionDefinition.setAtr(ctx) is called at the beginning of that method @@ -99,16 +97,6 @@ public Mono reactive(Function> R repository(R repo) { if (!(repo.getOperations() instanceof ReactiveCouchbaseOperations)) { throw new CouchbaseException("Repository must be a Reactive Couchbase repository" + repo); diff --git a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseClientUtils.java b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseClientUtils.java index 7f7de3736..8e648c1c7 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseClientUtils.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseClientUtils.java @@ -17,115 +17,11 @@ public class ReactiveCouchbaseClientUtils { - /** - * Check if the {@link ReactiveMongoDatabaseFactory} is actually bound to a - * {@link com.mongodb.reactivestreams.client.ClientSession} that has an active transaction, or if a - * {@link org.springframework.transaction.reactive.TransactionSynchronization} has been registered for the - * {@link ReactiveMongoDatabaseFactory resource} and if the associated - * {@link com.mongodb.reactivestreams.client.ClientSession} has an - * {@link com.mongodb.reactivestreams.client.ClientSession#hasActiveTransaction() active transaction}. - * - * @param databaseFactory the resource to check transactions for. Must not be {@literal null}. - * @return a {@link Mono} emitting {@literal true} if the factory has an ongoing transaction. - */ - public static Mono isTransactionActive(ReactiveCouchbaseClientFactory databaseFactory) { - - if (databaseFactory.isTransactionActive()) { - return Mono.just(true); - } - - return TransactionSynchronizationManager.forCurrentTransaction() // - .map(it -> { - - ReactiveCouchbaseResourceHolder holder = (ReactiveCouchbaseResourceHolder) it.getResource(databaseFactory); - return holder != null && holder.hasActiveTransaction(); - }) // - .onErrorResume(NoTransactionException.class, e -> Mono.just(false)); - } - - /** - * Obtain the default {@link MongoDatabase database} form the given {@link ReactiveMongoDatabaseFactory factory} using - * {@link SessionSynchronization#ON_ACTUAL_TRANSACTION native session synchronization}.
- * Registers a {@link MongoSessionSynchronization MongoDB specific transaction synchronization} within the subscriber - * {@link Context} if {@link TransactionSynchronizationManager#isSynchronizationActive() synchronization is active}. - * - * @param factory the {@link ReactiveMongoDatabaseFactory} to get the {@link MongoDatabase} from. - * @return the {@link MongoDatabase} that is potentially associated with a transactional {@link ClientSession}. - */ - public static Mono getDatabase(ReactiveCouchbaseClientFactory factory) { - return doGetCouchbaseCluster(null, factory, SessionSynchronization.ON_ACTUAL_TRANSACTION); - } - - /** - * Obtain the default {@link MongoDatabase database} form the given {@link ReactiveMongoDatabaseFactory factory}. - *
- * Registers a {@link MongoSessionSynchronization MongoDB specific transaction synchronization} within the subscriber - * {@link Context} if {@link TransactionSynchronizationManager#isSynchronizationActive() synchronization is active}. - * - * @param factory the {@link ReactiveMongoDatabaseFactory} to get the {@link MongoDatabase} from. - * @param sessionSynchronization the synchronization to use. Must not be {@literal null}. - * @return the {@link MongoDatabase} that is potentially associated with a transactional {@link ClientSession}. - */ - public static Mono getDatabase(ReactiveCouchbaseClientFactory factory, - SessionSynchronization sessionSynchronization) { - return doGetCouchbaseCluster(null, factory, sessionSynchronization); - } - public static Mono getTemplate(ReactiveCouchbaseClientFactory factory, SessionSynchronization sessionSynchronization, CouchbaseConverter converter) { return doGetCouchbaseTemplate(null, factory, sessionSynchronization, converter); } - /** - * Obtain the {@link MongoDatabase database} with given name form the given {@link ReactiveMongoDatabaseFactory - * factory} using {@link SessionSynchronization#ON_ACTUAL_TRANSACTION native session synchronization}.
- * Registers a {@link MongoSessionSynchronization MongoDB specific transaction synchronization} within the subscriber - * {@link Context} if {@link TransactionSynchronizationManager#isSynchronizationActive() synchronization is active}. - * - * @param dbName the name of the {@link MongoDatabase} to get. - * @param factory the {@link ReactiveMongoDatabaseFactory} to get the {@link MongoDatabase} from. - * @return the {@link MongoDatabase} that is potentially associated with a transactional {@link ClientSession}. - */ - public static Mono getDatabase(String dbName, ReactiveCouchbaseClientFactory factory) { - return doGetCouchbaseCluster(dbName, factory, SessionSynchronization.ON_ACTUAL_TRANSACTION); - } - - /** - * Obtain the {@link MongoDatabase database} with given name form the given {@link ReactiveMongoDatabaseFactory - * factory}.
- * Registers a {@link MongoSessionSynchronization MongoDB specific transaction synchronization} within the subscriber - * {@link Context} if {@link TransactionSynchronizationManager#isSynchronizationActive() synchronization is active}. - * - * @param dbName the name of the {@link MongoDatabase} to get. - * @param factory the {@link ReactiveMongoDatabaseFactory} to get the {@link MongoDatabase} from. - * @param sessionSynchronization the synchronization to use. Must not be {@literal null}. - * @return the {@link MongoDatabase} that is potentially associated with a transactional {@link ClientSession}. - */ - public static Mono getCluster(String dbName, ReactiveCouchbaseClientFactory factory, - SessionSynchronization sessionSynchronization) { - return doGetCouchbaseCluster(dbName, factory, sessionSynchronization); - } - - private static Mono doGetCouchbaseCluster(@Nullable String dbName, - ReactiveCouchbaseClientFactory factory, SessionSynchronization sessionSynchronization) { - - Assert.notNull(factory, "DatabaseFactory must not be null!"); - - if (sessionSynchronization == SessionSynchronization.NEVER) { - return getCouchbaseClusterOrDefault(dbName, factory); - } - - return TransactionSynchronizationManager.forCurrentTransaction() - .filter(TransactionSynchronizationManager::isSynchronizationActive) // - .flatMap(synchronizationManager -> { - - return doGetSession(synchronizationManager, factory, sessionSynchronization) // - .flatMap(it -> getCouchbaseClusterOrDefault(dbName, factory.withCore(it))); - }) // - .onErrorResume(NoTransactionException.class, e -> getCouchbaseClusterOrDefault(dbName, factory)) // hitting this - .switchIfEmpty(getCouchbaseClusterOrDefault(dbName, factory)); - } - private static Mono doGetCouchbaseTemplate(@Nullable String dbName, ReactiveCouchbaseClientFactory factory, SessionSynchronization sessionSynchronization, CouchbaseConverter converter) { @@ -136,9 +32,6 @@ private static Mono doGetCouchbaseTemplate(@Nullable return getCouchbaseTemplateOrDefault(dbName, factory, converter); } - //CouchbaseResourceHolder h = (CouchbaseResourceHolder) org.springframework.transaction.support.TransactionSynchronizationManager - // .getResource(factory); - return TransactionSynchronizationManager.forCurrentTransaction() .filter(TransactionSynchronizationManager::isSynchronizationActive) // .flatMap(synchronizationManager -> { @@ -197,6 +90,7 @@ private static Mono doGetSession(TransactionSyn System.err.println("doGetSession: createClientSession()"); // init a non native MongoDB transaction by registering a MongoSessionSynchronization + // todo gp but this always returns null - does this code get executed anywhere? return createClientSession(dbFactory).map(session -> { ReactiveCouchbaseResourceHolder newHolder = new ReactiveCouchbaseResourceHolder(session); diff --git a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseResourceHolder.java b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseResourceHolder.java index 4e3d09d5c..00e6ded54 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseResourceHolder.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseResourceHolder.java @@ -37,7 +37,6 @@ * @see ReactiveCouchbaseTransactionManager * @see ReactiveCouchbaseTemplate */ -// todo gp understand why this is needed public class ReactiveCouchbaseResourceHolder extends ResourceHolderSupport { private @Nullable CoreTransactionAttemptContext core; // which holds the atr @@ -76,23 +75,6 @@ CoreTransactionAttemptContext getRequiredCore() { return core; } - /* - * @return the associated {@link CouchbaseClientFactory}. - ReactiveCouchbaseClientFactory getDatabaseFactory() { - return databaseFactory; - } - */ - - /** - * Set the {@link CoreTransactionAttemptContext} to guard. - * - * @param core can be {@literal null}. - */ - CoreTransactionAttemptContext setCore(@Nullable CoreTransactionAttemptContext core) { - System.err.println("setCore: " + core); - return this.core = core; - } - /** * @return {@literal true} if session is not {@literal null}. */ @@ -100,37 +82,11 @@ boolean hasCore() { return core != null; } - /** - * If the {@link ReactiveCouchbaseResourceHolder} is {@link #hasCore() not already associated} with a - * {@link CoreTransactionAttemptContext} the given value is {@link #setCore(CoreTransactionAttemptContext)} set} and - * returned, otherwise the current bound session is returned. - * - * @param core - * @return - */ - @Nullable - CoreTransactionAttemptContext setSessionIfAbsent(@Nullable CoreTransactionAttemptContext core) { - - if (!hasCore()) { - setCore(core); - } - - return this.core; - } - public boolean hasActiveTransaction() { return getCore() != null; } - public TransactionResultHolder transactionResultHolder(Integer key) { - TransactionResultHolder holder = getResultMap.get(key); - if(holder == null){ - throw new RuntimeException("did not find transactionResultHolder for key="+key+" in session"); - } - return holder; - } - public TransactionResultHolder transactionResultHolder(TransactionResultHolder holder, Object o) { System.err.println("PUT: "+System.identityHashCode(o)+" "+o); getResultMap.put(System.identityHashCode(o), holder); diff --git a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveTransactionsWrapper.java b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveTransactionsWrapper.java index 79b89946c..9780c3f9e 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/ReactiveTransactionsWrapper.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/ReactiveTransactionsWrapper.java @@ -15,7 +15,6 @@ import com.couchbase.client.java.transactions.TransactionResult; import com.couchbase.client.java.transactions.config.TransactionOptions; -// todo gp needed now Transactions has gone? public class ReactiveTransactionsWrapper /* wraps ReactiveTransactions */ { ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory; @@ -34,11 +33,10 @@ public Mono run(Function run(Function> transactionLogic, TransactionOptions perConfig) { - // todo gp this is duplicating a lot of logic from the core loop, and is hopefully not needed. - // todo ^^^ I think I removed all the duplicate logic. Function> newTransactionLogic = (ctx) -> { ReactiveCouchbaseResourceHolder resourceHolder = reactiveCouchbaseClientFactory.getResourceHolder( TransactionOptions.transactionOptions(), AttemptContextReactiveAccessor.getCore(ctx)); + // todo gp let's DRY any TransactionSynchronizationManager code Mono sync = TransactionContextManager.currentContext() .map(TransactionSynchronizationManager::new).flatMap(synchronizationManager -> { synchronizationManager.bindResource(reactiveCouchbaseClientFactory.getCluster(), resourceHolder); diff --git a/src/main/java/org/springframework/data/couchbase/transaction/TransactionsWrapper.java b/src/main/java/org/springframework/data/couchbase/transaction/TransactionsWrapper.java index 68243b731..26e446429 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/TransactionsWrapper.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/TransactionsWrapper.java @@ -21,7 +21,6 @@ import com.couchbase.client.java.transactions.config.TransactionOptions; import com.couchbase.client.java.transactions.error.TransactionFailedException; -// todo gp needed now Transactions has gone? public class TransactionsWrapper /* wraps Transactions */ { CouchbaseClientFactory couchbaseClientFactory; @@ -77,6 +76,7 @@ public TransactionResult run(Consumer transactionLogi logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getCore()))); + // todo gp let's DRY any TransactionSynchronizationManager code TransactionSynchronizationManager.setActualTransactionActive(true); resourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.unbindResourceIfPossible(couchbaseClientFactory.getCluster()); diff --git a/src/test/java/org/springframework/data/couchbase/domain/PersonWithoutVersion.java b/src/test/java/org/springframework/data/couchbase/domain/PersonWithoutVersion.java index 4aa46ed8e..d60c75621 100644 --- a/src/test/java/org/springframework/data/couchbase/domain/PersonWithoutVersion.java +++ b/src/test/java/org/springframework/data/couchbase/domain/PersonWithoutVersion.java @@ -15,176 +15,26 @@ */ package org.springframework.data.couchbase.domain; -import org.springframework.data.annotation.CreatedBy; -import org.springframework.data.annotation.CreatedDate; -import org.springframework.data.annotation.LastModifiedBy; -import org.springframework.data.annotation.LastModifiedDate; -import org.springframework.data.annotation.Transient; -import org.springframework.data.annotation.Version; -import org.springframework.data.couchbase.core.mapping.Document; -import org.springframework.data.couchbase.core.mapping.Field; -import org.springframework.data.couchbase.repository.TransactionResult; -import org.springframework.data.domain.Persistable; -import org.springframework.lang.Nullable; - import java.util.Optional; import java.util.UUID; -// todo gpx: lame to C&P the entire Person, but struggling to get a simpler entity working +import org.springframework.data.couchbase.core.mapping.Document; +import org.springframework.lang.Nullable; + @Document -public class PersonWithoutVersion extends AbstractEntity implements Persistable { +public class PersonWithoutVersion extends AbstractEntity +{ Optional firstname; - @Nullable Optional lastname; - - @CreatedBy private String creator; - - @LastModifiedBy private String lastModifiedBy; - - @LastModifiedDate private long lastModification; - - @CreatedDate private long creationDate; - - @Nullable @Field("nickname") private String middlename; - @Nullable @Field(name = "prefix") private String salutation; - - private Address address; - - // Required for use in transactions - @TransactionResult private Integer txResultHolder; - @Transient private boolean isNew; - + Optional lastname; - public PersonWithoutVersion() {} - - public PersonWithoutVersion(String firstname, String lastname) { - this(); - setFirstname(firstname); - setLastname(lastname); - setMiddlename("Nick"); - isNew(true); - } - - public PersonWithoutVersion(int id, String firstname, String lastname) { - this(firstname, lastname); - setId(new UUID(id, id)); + public PersonWithoutVersion() { + firstname = Optional.empty(); + lastname = Optional.empty(); } public PersonWithoutVersion(UUID id, String firstname, String lastname) { - this(firstname, lastname); + this.firstname = Optional.of(firstname); + this.lastname = Optional.of(lastname); setId(id); } - - static String optional(String name, Optional obj) { - if (obj != null) { - if (obj.isPresent()) { - return (" " + name + ": '" + obj.get() + "'"); - } else { - return " " + name + ": null"; - } - } - return ""; - } - - public String getFirstname() { - return firstname.get(); - } - - public void setFirstname(String firstname) { - this.firstname = firstname == null ? null : (Optional.ofNullable(firstname.equals("") ? null : firstname)); - } - - public void setFirstname(Optional firstname) { - this.firstname = firstname; - } - - public String getLastname() { - return lastname.get(); - } - - public void setLastname(String lastname) { - this.lastname = lastname == null ? null : (Optional.ofNullable(lastname.equals("") ? null : lastname)); - } - - public void setLastname(Optional lastname) { - this.lastname = lastname; - } - - public String getMiddlename() { - return middlename; - } - - public String getSalutation() { - return salutation; - } - - public void setMiddlename(String middlename) { - this.middlename = middlename; - } - - public void setSalutation(String salutation) { - this.salutation = salutation; - } - - public Address getAddress() { - return address; - } - - public void setAddress(Address address) { - this.address = address; - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Person : {\n"); - sb.append(" id : " + getId()); - sb.append(optional(", firstname", firstname)); - sb.append(optional(", lastname", lastname)); - if (middlename != null) - sb.append(", middlename : '" + middlename + "'"); - if (creator != null) { - sb.append(", creator : " + creator); - } - if (creationDate != 0) { - sb.append(", creationDate : " + creationDate); - } - if (lastModifiedBy != null) { - sb.append(", lastModifiedBy : " + lastModifiedBy); - } - if (lastModification != 0) { - sb.append(", lastModification : " + lastModification); - } - if (getAddress() != null) { - sb.append(", address : " + getAddress().toString()); - } - sb.append("\n}"); - return sb.toString(); - } - - public PersonWithoutVersion withFirstName(String firstName) { - PersonWithoutVersion p = new PersonWithoutVersion(this.getId(), firstName, this.getLastname()); - p.txResultHolder = this.txResultHolder; - return p; - } - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!super.equals(obj)) { - return false; - } - - PersonWithoutVersion that = (PersonWithoutVersion) obj; - return this.getId().equals(that.getId()) && this.getFirstname().equals(that.getFirstname()) - && this.getLastname().equals(that.getLastname()) && this.getMiddlename().equals(that.getMiddlename()); - } - - @Override - public boolean isNew() { - return isNew; - } - - public void isNew(boolean isNew){ - this.isNew = isNew; - } } diff --git a/src/test/java/org/springframework/data/couchbase/transactions/Config.java b/src/test/java/org/springframework/data/couchbase/transactions/Config.java index e9193f294..8cd7c8bd5 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/Config.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/Config.java @@ -1,10 +1,5 @@ package org.springframework.data.couchbase.transactions; -import java.time.Duration; - -import com.couchbase.client.core.msg.kv.DurabilityLevel; -import com.couchbase.client.core.transaction.config.CoreTransactionsConfig; -import com.couchbase.client.java.transactions.config.TransactionsConfig; import org.springframework.context.annotation.Configuration; import org.springframework.data.couchbase.config.AbstractCouchbaseConfiguration; import org.springframework.data.couchbase.repository.config.EnableCouchbaseRepositories; @@ -12,7 +7,6 @@ import org.springframework.data.couchbase.util.ClusterAwareIntegrationTests; import org.springframework.transaction.annotation.EnableTransactionManagement; -import com.couchbase.client.java.transactions.config.TransactionOptions; @Configuration @EnableCouchbaseRepositories("org.springframework.data.couchbase") @@ -39,36 +33,4 @@ public String getPassword() { public String getBucketName() { return ClusterAwareIntegrationTests.bucketName(); } - - @Override - public TransactionsConfig.Builder transactionsConfig() { - return TransactionsConfig.builder().durabilityLevel(DurabilityLevel.NONE).timeout(Duration.ofMinutes(20));// for testing - } - - /* - @Override - public TransactionsConfig transactionConfig() { - // expirationTime 20 minutes for stepping with the debugger - return TransactionsConfig.create() - .logDirectly(Event.Severity.INFO) - .logOnFailure(true, - Event.Severity.ERROR) - .expirationTime(Duration.ofMinutes(20)) - .durabilityLevel(TransactionDurabilityLevel.MAJORITY) - .build(); - } - */ - /* - beforeAll creates a PersonService bean in the applicationContext - - context = new AnnotationConfigApplicationContext(CouchbasePersonTransactionIntegrationTests.Config.class, - PersonService.class); - - @Bean("personService") - PersonService getPersonService(CouchbaseOperations ops, CouchbaseTransactionManager mgr, - ReactiveCouchbaseOperations opsRx, ReactiveCouchbaseTransactionManager mgrRx) { - return new PersonService(ops, mgr, opsRx, mgrRx); - } - */ - } diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionIntegrationTests.java index 2748ac525..5947d1592 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionIntegrationTests.java @@ -255,7 +255,7 @@ public void emitMultipleElementsDuringTransaction() { @Test public void errorAfterTxShouldNotAffectPreviousStep() { Person p = personService.savePerson(new Person(null, "Walter", "White")); - // todo gp user shouldn't be getting exposed to TransactionOperationFailedException + // todo gp user shouldn't be getting exposed to TransactionOperationFailedException. This is happening as TransactionOperator does not support the automatic retries and error handling we depend on. As argued on Slack, we cannot support it because of this. // todo mr /* TransactionOperationFailedException {cause:com.couchbase.client.core.error.DocumentExistsException, retry:false, autoRollback:true, raise:TRANSACTION_FAILED} @@ -662,45 +662,6 @@ public String toString() { } } - // todo gp disabled while trying to get alternative method of CouchbaseCallbackTransactionManager working - // @Configuration(proxyBeanMethods = false) - // @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - // static class TransactionInterception { - // - // @Bean - // @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - // public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource, - // CouchbaseTransactionManager txManager) { - // TransactionInterceptor interceptor = new CouchbaseTransactionInterceptor(); - // interceptor.setTransactionAttributeSource(transactionAttributeSource); - // if (txManager != null) { - // interceptor.setTransactionManager(txManager); - // } - // return interceptor; - // } - // - // @Bean - // @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - // public TransactionAttributeSource transactionAttributeSource() { - // return new AnnotationTransactionAttributeSource(); - // } - // - // @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) - // @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - // public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( - // TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) { - // - // BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); - // advisor.setTransactionAttributeSource(transactionAttributeSource); - // advisor.setAdvice(transactionInterceptor); - // // if (this.enableTx != null) { - // // advisor.setOrder(this.enableTx.getNumber("order")); - // // } - // return advisor; - // } - // - // } - @Service @Component @EnableTransactionManagement diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTemplateTransactionIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTemplateTransactionIntegrationTests.java index 97c5f109d..4e6bb1ad9 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTemplateTransactionIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTemplateTransactionIntegrationTests.java @@ -20,7 +20,6 @@ import static org.springframework.data.couchbase.util.Util.assertInAnnotationTransaction; import com.couchbase.client.core.error.DocumentNotFoundException; -import com.example.demo.CouchbaseTransactionalTemplate; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalPropagationIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalPropagationIntegrationTests.java index 63cdc42a3..517658c85 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalPropagationIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalPropagationIntegrationTests.java @@ -16,6 +16,7 @@ package org.springframework.data.couchbase.transactions; +import com.couchbase.client.core.error.transaction.RetryTransactionException; import com.couchbase.client.java.transactions.error.TransactionFailedException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -46,18 +47,18 @@ import org.springframework.transaction.annotation.Transactional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertInTransaction; +import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction; -// todo gpx test repository methods in @Transactional -// todo gpx test queries in @Transactional -// todo gpx chekc what happens when try to do reactive @Transcational (unsupported by CallbackPreferring) -// todo gpx handle synchronization /** * Tests for the various propagation values allowed on @Transactional methods. @@ -168,8 +169,6 @@ public void callNested() { }); } - // todo gp check retries - @DisplayName("Call @Transactional that calls @Transactional(propagation = DEFAULT) - succeeds, continues existing") @Test public void callDefaultThatCallsDefault() { @@ -186,7 +185,8 @@ public void callDefaultThatCallsDefault() { }); }); - // Validate everyting committed + // Validate everything committed + assertNotNull(operations.findById(Person.class).one(id1.toString())); assertNotNull(operations.findById(Person.class).one(id2.toString())); } @@ -207,7 +207,7 @@ public void callDefaultThatCallsRequired() { }); }); - // Validate everyting committed + // Validate everything committed assertNotNull(operations.findById(Person.class).one(id1.toString())); assertNotNull(operations.findById(Person.class).one(id2.toString())); } @@ -228,7 +228,7 @@ public void callDefaultThatCallsMandatory() { }); }); - // Validate everyting committed + // Validate everything committed assertNotNull(operations.findById(Person.class).one(id1.toString())); assertNotNull(operations.findById(Person.class).one(id2.toString())); } @@ -325,6 +325,32 @@ public void callDefaultThatCallsNested() { assertNull(operations.findById(Person.class).one(id1.toString())); } + @DisplayName("Call @Transactional that calls @Transactional(propagation = DEFAULT) - check retries act correct") + @Test + public void callDefaultThatCallsDefaultRetries() { + UUID id1 = UUID.randomUUID(); + UUID id2 = UUID.randomUUID(); + AtomicInteger attempts = new AtomicInteger(); + + personService.propagationDefault(ops -> { + ops.insertById(Person.class).one(new Person(id1, "Ada", "Lovelace")); + + personService.propagationDefault(ops2 -> { + ops2.insertById(Person.class).one(new Person(id2, "Grace", "Hopper")); + assertInTransaction(); + + if (attempts.incrementAndGet() < 3) { + throw new RetryTransactionException(); + } + }); + }); + + // Validate everything committed + assertNotNull(operations.findById(Person.class).one(id1.toString())); + assertNotNull(operations.findById(Person.class).one(id2.toString())); + assertEquals(3, attempts.get()); + } + void assertInTransaction() { assertTrue(org.springframework.transaction.support.TransactionSynchronizationManager.isActualTransactionActive()); } diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalRepositoryIntegrationTests.java index 0aee620f7..b058eb294 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalRepositoryIntegrationTests.java @@ -106,7 +106,7 @@ public void save() { assertInTransaction(); // read your own write - // todo gpx this is failing because it's being executed non-transactionally, due to a bug somewhere + // todo gpx now understand why this was failing, but have concerns (see Slack) about UX User user = operations.findById(User.class).one(id); assertNotNull(user); diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalTemplateIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalTemplateIntegrationTests.java index 202e63d3e..e10d8d6bf 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbaseTransactionalTemplateIntegrationTests.java @@ -32,6 +32,8 @@ import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.CouchbaseTemplate; import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations; +import org.springframework.data.couchbase.core.RemoveResult; +import org.springframework.data.couchbase.core.query.QueryCriteria; import org.springframework.data.couchbase.domain.Person; import org.springframework.data.couchbase.domain.PersonWithoutVersion; import org.springframework.data.couchbase.transaction.CouchbaseSimpleCallbackTransactionManager; @@ -45,8 +47,11 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; +import javax.management.Query; +import javax.management.ValueExp; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -147,6 +152,37 @@ public void committedRemove() { assertEquals(1, tryCount.get()); } + @DisplayName("A basic golden path removeByQuery should succeed") + @Test + public void committedRemoveByQuery() { + AtomicInteger tryCount = new AtomicInteger(0); + Person person = new Person(1, "Walter", "White"); + operations.insertById(Person.class).one(person); + + List removed = personService.doInTransaction(tryCount, ops -> { + return ops.removeByQuery(Person.class).matching(QueryCriteria.where("firstname").eq("Walter")).all(); + }); + + Person fetched = operations.findById(Person.class).one(person.getId().toString()); + assertNull(fetched); + assertEquals(1, tryCount.get()); + assertEquals(1, removed.size()); + } + + @DisplayName("A basic golden path findByQuery should succeed (though we don't know for sure it executed transactionally)") + @Test + public void committedFindByQuery() { + AtomicInteger tryCount = new AtomicInteger(0); + Person person = new Person(1, "Walter", "White"); + operations.insertById(Person.class).one(person); + + List found = personService.doInTransaction(tryCount, ops -> { + return ops.findByQuery(Person.class).matching(QueryCriteria.where("firstname").eq("Walter")).all(); + }); + + assertEquals(1, found.size()); + } + @DisplayName("Basic test of doing an insert then rolling back") @Test public void rollbackInsert() { @@ -220,6 +256,50 @@ public void rollbackRemove() { assertEquals(1, tryCount.get()); } + + @DisplayName("Basic test of doing a removeByQuery then rolling back") + @Test + public void rollbackRemoveByQuery() { + AtomicInteger tryCount = new AtomicInteger(0); + Person person = new Person(1, "Walter", "White"); + operations.insertById(Person.class).one(person); + + try { + personService.doInTransaction(tryCount, ops -> { + // todo gpx this isn't executed transactionally + ops.removeByQuery(Person.class).matching(QueryCriteria.where("firstname").eq("Walter")).all(); + throw new SimulateFailureException(); + }); + fail(); + } catch (TransactionFailedException err) { + assertTrue(err.getCause() instanceof SimulateFailureException); + } + + Person fetched = operations.findById(Person.class).one(person.getId().toString()); + assertNotNull(fetched); + assertEquals(1, tryCount.get()); + } + + @DisplayName("Basic test of doing a findByQuery then rolling back") + @Test + public void rollbackFindByQuery() { + AtomicInteger tryCount = new AtomicInteger(0); + Person person = new Person(1, "Walter", "White"); + operations.insertById(Person.class).one(person); + + try { + personService.doInTransaction(tryCount, ops -> { + ops.findByQuery(Person.class).matching(QueryCriteria.where("firstname").eq("Walter")).all(); + throw new SimulateFailureException(); + }); + fail(); + } catch (TransactionFailedException err) { + assertTrue(err.getCause() instanceof SimulateFailureException); + } + + assertEquals(1, tryCount.get()); + } + @Test public void shouldRollbackAfterException() { try { @@ -270,35 +350,34 @@ public void concurrentTxns() { } // todo gpx investigate how @Transactional @Rollback/@Commit interacts with us - // todo gpx how to provide per-transaction options? - @Disabled("taking too long - must fix") @DisplayName("Create a Person outside a @Transactional block, modify it, and then replace that person in the @Transactional. The transaction will retry until timeout.") @Test public void replacePerson() { Person person = new Person(1, "Walter", "White"); operations.insertById(Person.class).one(person); - System.out.printf("insert CAS: %s%n", person.getVersion()); - Person refetched = operations.findById(Person.class).one(person.getId().toString()); operations.replaceById(Person.class).one(refetched); - System.out.printf("replace CAS: %s%n", refetched.getVersion()); - assertNotEquals(person.getVersion(), refetched.getVersion()); AtomicInteger tryCount = new AtomicInteger(0); - // todo gpx this is raising incorrect error: - // com.couchbase.client.core.retry.reactor.RetryExhaustedException: com.couchbase.client.core.error.transaction.RetryTransactionException: User request to retry transaction - personService.replace(person, tryCount); + try { + personService.replace(person, tryCount); + fail(); + } + catch (TransactionFailedException ignored) { + } + } @DisplayName("Entity must have CAS field during replace") @Test public void replaceEntityWithoutCas() { - PersonWithoutVersion person = new PersonWithoutVersion(1, "Walter", "White"); + PersonWithoutVersion person = new PersonWithoutVersion(UUID.randomUUID(), "Walter", "White"); + operations.insertById(PersonWithoutVersion.class).one(person); try { personService.replaceEntityWithoutVersion(person.getId().toString()); @@ -329,7 +408,8 @@ public void replaceEntityWithCasZero() { @DisplayName("Entity must have CAS field during remove") @Test public void removeEntityWithoutCas() { - PersonWithoutVersion person = new PersonWithoutVersion(1, "Walter", "White"); + PersonWithoutVersion person = new PersonWithoutVersion(UUID.randomUUID(), "Walter", "White"); + operations.insertById(PersonWithoutVersion.class).one(person); try { personService.removeEntityWithoutVersion(person.getId().toString()); @@ -442,12 +522,8 @@ public Person declarativeFindReplaceTwicePersonCallback(Person person, AtomicInt return personOperations.replaceById(Person.class).one(pUpdated); } - // todo gpx how do we make COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER the default so user only has to specify @Transactional, without the transactionManager? - // todo mr - // todo if there is exactly one bean of type ‘org.springframework.transaction.TransactionManager’. - // todo It’s also possible to put the @Transaction annotation on the class (instead of each method). - // todo see TransactionAspectSupport.determineTransactionManager(TransactionAttribute) - @Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER) + @Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER, timeout = 2) + public Person replace(Person person, AtomicInteger tryCount) { assertInAnnotationTransaction(true); tryCount.incrementAndGet(); diff --git a/src/test/java/org/springframework/data/couchbase/util/ClusterAwareIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/util/ClusterAwareIntegrationTests.java index 5deab4ba9..842cae6cd 100644 --- a/src/test/java/org/springframework/data/couchbase/util/ClusterAwareIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/util/ClusterAwareIntegrationTests.java @@ -64,7 +64,7 @@ public abstract class ClusterAwareIntegrationTests { @BeforeAll static void setup(TestClusterConfig config) { testClusterConfig = config; - // todo gp disabling cleanupLostAttempts to simplify output during development + // Disabling cleanupLostAttempts to simplify output during development ClusterEnvironment env = ClusterEnvironment.builder() .transactionsConfig(TransactionsConfig.cleanupConfig(TransactionsCleanupConfig.cleanupLostAttempts(false))) .build();