Skip to content

Commit 4e93e55

Browse files
committed
Commit before pulling Graham's changes.
1 parent 95bf2fc commit 4e93e55

20 files changed

+636
-439
lines changed

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,12 @@
292292
<enabled>false</enabled>
293293
</releases>
294294
</repository>
295+
<!--
295296
<repository>
296297
<id>jitpack.io</id>
297298
<url>https://jitpack.io</url>
298299
</repository>
300+
-->
299301
</repositories>
300302

301303
<pluginRepositories>
@@ -327,6 +329,7 @@
327329
<groupId>org.apache.maven.plugins</groupId>
328330
<artifactId>maven-failsafe-plugin</artifactId>
329331
<configuration>
332+
<useModulePath>false</useModulePath>
330333
<includes>
331334
<include>**/*IntegrationTest.java</include>
332335
<include>**/*IntegrationTests.java</include>

src/main/java/com/couchbase/transactions/AttemptContextReactiveAccessor.java

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*/
1717
package com.couchbase.transactions;
1818

19+
import java.time.Duration;
20+
import java.time.temporal.ChronoUnit;
21+
import java.util.Optional;
22+
import java.util.UUID;
23+
1924
import com.couchbase.client.core.annotation.Stability;
2025
import com.couchbase.transactions.config.MergedTransactionConfig;
2126
import com.couchbase.transactions.config.PerTransactionConfig;
@@ -24,67 +29,68 @@
2429
import com.couchbase.transactions.forwards.Supported;
2530
import com.couchbase.transactions.log.TransactionLogger;
2631

27-
import java.time.Duration;
28-
import java.time.temporal.ChronoUnit;
29-
import java.util.Optional;
30-
import java.util.UUID;
31-
3232
/**
3333
* To access the AttemptContextReactive held by AttemptContext
3434
*
3535
* @author Michael Reiche
3636
*/
3737
public class AttemptContextReactiveAccessor {
3838

39-
public static AttemptContextReactive getACR(AttemptContext attemptContext) {
40-
return attemptContext.ctx();
41-
}
39+
public static AttemptContextReactive getACR(AttemptContext attemptContext) {
40+
return attemptContext.ctx();
41+
}
42+
43+
public static AttemptContext from(AttemptContextReactive attemptContextReactive) {
44+
return new AttemptContext(attemptContextReactive);
45+
}
4246

43-
public static AttemptContext from(AttemptContextReactive attemptContextReactive) {
44-
return new AttemptContext(attemptContextReactive);
45-
}
47+
public static TransactionLogger getLogger(AttemptContextReactive attemptContextReactive) {
48+
return attemptContextReactive.LOGGER;
49+
}
4650

47-
public static TransactionLogger getLogger(AttemptContextReactive attemptContextReactive){
48-
return attemptContextReactive.LOGGER;
49-
}
50-
@Stability.Internal
51-
public static AttemptContextReactive newAttemptContextReactive(TransactionsReactive transactions){
52-
PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
53-
MergedTransactionConfig merged = new MergedTransactionConfig(transactions.config(), Optional.of(perConfig));
51+
@Stability.Internal
52+
public static AttemptContextReactive newAttemptContextReactive(TransactionsReactive transactions) {
53+
PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
54+
MergedTransactionConfig merged = new MergedTransactionConfig(transactions.config(), Optional.of(perConfig));
5455

55-
TransactionContext overall = new TransactionContext(
56-
transactions.cleanup().clusterData().cluster().environment().requestTracer(),
57-
transactions.cleanup().clusterData().cluster().environment().eventBus(),
58-
UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
56+
TransactionContext overall = new TransactionContext(
57+
transactions.cleanup().clusterData().cluster().environment().requestTracer(),
58+
transactions.cleanup().clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), now(),
59+
Duration.ZERO, merged);
5960

60-
String txnId = UUID.randomUUID().toString();
61-
overall.LOGGER.info(configDebug(transactions.config(), perConfig));
62-
return transactions.createAttemptContext(overall, merged, txnId);
63-
}
61+
String txnId = UUID.randomUUID().toString();
62+
overall.LOGGER.info(configDebug(transactions.config(), perConfig));
63+
return newAttemptContextReactive(transactions, overall, merged, txnId);
64+
}
6465

65-
private static Duration now() {
66-
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
67-
}
66+
@Stability.Internal
67+
public static AttemptContextReactive newAttemptContextReactive(TransactionsReactive reactive,
68+
TransactionContext overall, MergedTransactionConfig merged, String txnId) {
69+
return reactive.createAttemptContext(overall, merged, txnId);
70+
}
6871

69-
static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
70-
StringBuilder sb = new StringBuilder();
71-
sb.append("library version: ");
72-
sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
73-
sb.append(" config: ");
74-
sb.append("atrs=");
75-
sb.append(config.numAtrs());
76-
sb.append(", metadataCollection=");
77-
sb.append(config.metadataCollection());
78-
sb.append(", expiry=");
79-
sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
80-
sb.append("msecs durability=");
81-
sb.append(config.durabilityLevel());
82-
sb.append(" per-txn config=");
83-
sb.append(" durability=");
84-
sb.append(perConfig.durabilityLevel());
85-
sb.append(", supported=");
86-
sb.append(Supported.SUPPORTED);
87-
return sb.toString();
88-
}
72+
private static Duration now() {
73+
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
74+
}
8975

76+
static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
77+
StringBuilder sb = new StringBuilder();
78+
sb.append("library version: ");
79+
sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
80+
sb.append(" config: ");
81+
sb.append("atrs=");
82+
sb.append(config.numAtrs());
83+
sb.append(", metadataCollection=");
84+
sb.append(config.metadataCollection());
85+
sb.append(", expiry=");
86+
sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
87+
sb.append("msecs durability=");
88+
sb.append(config.durabilityLevel());
89+
sb.append(" per-txn config=");
90+
sb.append(" durability=");
91+
sb.append(perConfig.durabilityLevel());
92+
sb.append(", supported=");
93+
sb.append(Supported.SUPPORTED);
94+
return sb.toString();
95+
}
9096
}

src/main/java/com/couchbase/transactions/TransactionsReactive.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private Mono<AttemptContextReactive> executeImplicitCommit(AttemptContextReactiv
364364
}
365365

366366
// TODO: changed from package-protected to public (could have just used an accessor class in same package)
367-
public AttemptContextReactive createAttemptContext(TransactionContext overall,
367+
AttemptContextReactive createAttemptContext(TransactionContext overall,
368368
MergedTransactionConfig config,
369369
String attemptId) {
370370
// null only happens in testing with Mockito, harmless

src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*
4646
/**
4747
* Provides access to the managed SDK {@link Cluster} reference.
4848
*/
49-
//Cluster getCluster();
50-
5149
Mono<ClusterInterface> getCluster();
5250

5351
/**
@@ -58,18 +56,15 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*
5856
/**
5957
* Provides access to the managed SDK {@link Scope} reference.
6058
*/
61-
//Scope getScope();
62-
6359
Mono<Scope> getScope();
6460

6561
/**
6662
* Provides access to a collection (identified by its name) in managed SDK {@link Scope} reference.
6763
*
6864
* @param name the name of the collection. If null is passed in, the default collection is assumed.
6965
*/
70-
//Collection getCollection(String name);
71-
7266
Mono<Collection> getCollection(String name);
67+
7368
/**
7469
* Provides access to the default collection.
7570
*/
@@ -113,7 +108,4 @@ ClientSession getSession(ClientSessionOptions options, Transactions transactions
113108
*/
114109
boolean isTransactionActive();
115110

116-
//CouchbaseStuffHandle getTransactionalOperator();
117-
118-
//ReactiveCouchbaseClientFactory with(CouchbaseStuffHandle txOp);
119111
}

src/main/java/org/springframework/data/couchbase/config/AbstractCouchbaseConfiguration.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.context.annotation.Bean;
3131
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
3232
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.context.annotation.Role;
3334
import org.springframework.core.type.filter.AnnotationTypeFilter;
3435
import org.springframework.data.convert.CustomConversions;
3536
import org.springframework.data.couchbase.CouchbaseClientFactory;
@@ -49,9 +50,15 @@
4950
import org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager;
5051
import org.springframework.data.couchbase.transaction.CouchbaseTransactionManager;
5152
import org.springframework.data.couchbase.transaction.ReactiveCouchbaseTransactionManager;
53+
import org.springframework.data.couchbase.transaction.interceptor.CouchbaseTransactionInterceptor;
5254
import org.springframework.data.mapping.model.CamelCaseAbbreviatingFieldNamingStrategy;
5355
import org.springframework.data.mapping.model.FieldNamingStrategy;
5456
import org.springframework.data.mapping.model.PropertyNameFieldNamingStrategy;
57+
import org.springframework.transaction.annotation.AnnotationTransactionAttributeSource;
58+
import org.springframework.transaction.config.TransactionManagementConfigUtils;
59+
import org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor;
60+
import org.springframework.transaction.interceptor.TransactionAttributeSource;
61+
import org.springframework.transaction.interceptor.TransactionInterceptor;
5562
import org.springframework.util.ClassUtils;
5663
import org.springframework.util.StringUtils;
5764

@@ -436,4 +443,31 @@ public QueryScanConsistency getDefaultConsistency() {
436443
return null;
437444
}
438445

446+
@Bean
447+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
448+
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
449+
TransactionInterceptor interceptor = new CouchbaseTransactionInterceptor();
450+
interceptor.setTransactionAttributeSource(transactionAttributeSource);
451+
return interceptor;
452+
}
453+
454+
@Bean
455+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
456+
public TransactionAttributeSource transactionAttributeSource() {
457+
return new AnnotationTransactionAttributeSource();
458+
}
459+
460+
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
461+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
462+
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
463+
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
464+
465+
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
466+
advisor.setTransactionAttributeSource(transactionAttributeSource);
467+
advisor.setAdvice(transactionInterceptor);
468+
// if (this.enableTx != null) {
469+
// advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
470+
// }
471+
return advisor;
472+
}
439473
}

src/main/java/org/springframework/data/couchbase/core/ReactiveReplaceByIdOperationSupport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ public Mono<T> one(T object) {
128128
CouchbaseDocument converted = support.encodeEntity(object).block();
129129
reactiveEntity = tmpl.flatMap(tp -> tp.getCouchbaseClientFactory().getSession(null).flatMap(s -> {
130130
if (s == null || s.getAttemptContextReactive() == null) {
131-
System.err.println("ReactiveReplaceById: not");
132131
Mono<com.couchbase.client.java.Collection> op = template.getCouchbaseClientFactory()
133132
.withScope(pArgs.getScope()).getCollection(pArgs.getCollection());
134133
return op.flatMap(collection -> collection.reactive()

src/main/java/org/springframework/data/couchbase/repository/support/CrudMethodMetadataPostProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
168168
try {
169169
return invocation.proceed();
170170
} finally {
171-
// TransactionSynchronizationManager.unbindResource(method);
171+
TransactionSynchronizationManager.unbindResource(method);
172172
}
173173
} finally {
174-
// currentInvocation.set(oldInvocation);
174+
currentInvocation.set(oldInvocation);
175175
}
176176
}
177177
}

src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.data.couchbase.repository.support;
1818

19+
import com.couchbase.client.java.CommonOptions;
1920
import reactor.core.publisher.Flux;
2021
import reactor.core.publisher.Mono;
2122

@@ -74,6 +75,24 @@ public SimpleReactiveCouchbaseRepository(CouchbaseEntityInformation<T, String> e
7475
@SuppressWarnings("unchecked")
7576
@Override
7677
public <S extends T> Mono<S> save(S entity) {
78+
return save(entity, getScope(), getCollection());
79+
}
80+
81+
@Override
82+
public Flux<T> findAll(Sort sort) {
83+
return findAll(new Query().with(sort));
84+
}
85+
86+
@Override
87+
public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
88+
Assert.notNull(entities, "The given Iterable of entities must not be null!");
89+
String scopeName = getScope();
90+
String collection = getCollection();
91+
return Flux.fromIterable(entities).flatMap(e -> save(e, scopeName, collection));
92+
}
93+
94+
@SuppressWarnings("unchecked")
95+
public <S extends T> Mono<S> save(S entity, String scopeName, String collectionName) {
7796
Assert.notNull(entity, "Entity must not be null!");
7897
Mono<S> result;
7998
final CouchbasePersistentEntity<?> mapperEntity = operations.getConverter().getMappingContext()
@@ -86,31 +105,20 @@ public <S extends T> Mono<S> save(S entity) {
86105

87106
if (!versionPresent) { // the entity doesn't have a version property
88107
// No version field - no cas
89-
result = (Mono<S>) operations.upsertById(getJavaType()).inScope(getScope()).inCollection(getCollection())
108+
result = (Mono<S>) operations.upsertById(getJavaType()).inScope(scopeName).inCollection(collectionName)
90109
.one(entity);
91110
} else if (existingDocument) { // there is a version property, and it is non-zero
92111
// Updating existing document with cas
93-
result = (Mono<S>) operations.replaceById(getJavaType()).inScope(getScope()).inCollection(getCollection())
112+
result = (Mono<S>) operations.replaceById(getJavaType()).inScope(scopeName).inCollection(collectionName)
94113
.one(entity);
95114
} else { // there is a version property, but it's zero or not set.
96115
// Creating new document
97-
result = (Mono<S>) operations.insertById(getJavaType()).inScope(getScope()).inCollection(getCollection())
116+
result = (Mono<S>) operations.insertById(getJavaType()).inScope(scopeName).inCollection(collectionName)
98117
.one(entity);
99118
}
100119
return result;
101120
}
102121

103-
@Override
104-
public Flux<T> findAll(Sort sort) {
105-
return findAll(new Query().with(sort));
106-
}
107-
108-
@Override
109-
public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
110-
Assert.notNull(entities, "The given Iterable of entities must not be null!");
111-
return Flux.fromIterable(entities).flatMap(e -> save(e));
112-
}
113-
114122
@Override
115123
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
116124
Assert.notNull(entityStream, "The given Iterable of entities must not be null!");

src/main/java/org/springframework/data/couchbase/repository/support/TransactionResultHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class TransactionResultHolder {
3030
SingleQueryTransactionResult singleQueryResult;
3131

3232
public TransactionResultHolder(TransactionGetResult getResult) {
33-
// we don't need the content and we don't have access to the transcoder an txnMeta (and we don't need them either).
33+
// we don't need the content and we don't have access to the transcoder and txnMeta (and we don't need them either).
3434
this.getResult = new TransactionGetResult(getResult.id(), null, getResult.cas(), getResult.collection(),
3535
getResult.links(), getResult.status(), getResult.documentMetadata(), null, null);
3636
this.singleQueryResult = null;
@@ -46,6 +46,6 @@ public TransactionGetResult transactionGetResult() {
4646
}
4747

4848
public SingleQueryTransactionResult singleQueryResult() {
49-
return singleQueryResult();
49+
return singleQueryResult;
5050
}
5151
}

src/main/java/org/springframework/data/couchbase/transaction/ReactiveCouchbaseClientUtils.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,16 @@ private static Mono<ReactiveCouchbaseTemplate> doGetCouchbaseTemplate(@Nullable
138138

139139
//CouchbaseResourceHolder h = (CouchbaseResourceHolder) org.springframework.transaction.support.TransactionSynchronizationManager
140140
// .getResource(factory);
141-
TransactionSynchronizationManager.forCurrentTransaction()
142-
.flatMap((synchronizationManager) -> { System.out.println(synchronizationManager.getResource(factory)); return null; });
143141

144142
return TransactionSynchronizationManager.forCurrentTransaction()
145-
.flatMap(x -> { System.err.println("forCurrentTransaction: getResource() : "+x.getResource(factory.getCluster().block())); return Mono.just(x);})
143+
.flatMap(x -> {/* System.err.println("forCurrentTransaction: getResource() : "+x.getResource(factory.getCluster().block()));*/ return Mono.just(x);})
146144
.filter(TransactionSynchronizationManager::isSynchronizationActive) //
147145
.flatMap(synchronizationManager -> {
148146
return doGetSession(synchronizationManager, factory, sessionSynchronization) //
149147
.flatMap(it -> getCouchbaseTemplateOrDefault(dbName, factory.withSession(it), converter)); // rx TxMgr
150148
}) //
151149
.onErrorResume(NoTransactionException.class,
152-
e -> { System.err.println("noCurrentTransaction: "); return getCouchbaseTemplateOrDefault(dbName,
150+
e -> { /* System.err.println("noCurrentTransaction: "); */return getCouchbaseTemplateOrDefault(dbName,
153151
getNonReactiveSession(factory) != null ? factory.withSession(getNonReactiveSession(factory)) : factory,
154152
converter);}) // blocking TxMgr
155153
.switchIfEmpty(getCouchbaseTemplateOrDefault(dbName, factory, converter));
@@ -162,7 +160,7 @@ private static ClientSession getNonReactiveSession(ReactiveCouchbaseClientFactor
162160
h = ((CouchbaseResourceHolder) org.springframework.transaction.support.TransactionSynchronizationManager
163161
.getResource(factory));// MN's CouchbaseTransactionManager
164162
}
165-
System.err.println("getNonreactiveSession: "+ h);
163+
//System.err.println("getNonreactiveSession: "+ h);
166164
return h != null ? h.getSession() : null;
167165
}
168166

src/main/java/org/springframework/data/couchbase/transaction/TransactionsWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public Mono<TransactionResult> run(Function<AttemptContextReactive, Mono<Void>>
6767
Mono<AttemptContextReactive> ob = Mono.fromCallable(() -> {
6868
String txnId = UUID.randomUUID().toString();
6969
overall.LOGGER.info(configDebug(config, perConfig));
70-
return transactions.reactive().createAttemptContext(overall, merged, txnId);
70+
return AttemptContextReactiveAccessor.newAttemptContextReactive(transactions.reactive(),overall, merged, txnId);
7171
}).flatMap(ctx -> {
7272

7373
AttemptContextReactiveAccessor.getLogger(ctx).info("starting attempt %d/%s/%s",

0 commit comments

Comments
 (0)