Skip to content

Commit a226f88

Browse files
committed
Support for @transactional for blocking and reactive Transactionmanager.
Closes 1145.
1 parent 3cc9af0 commit a226f88

File tree

102 files changed

+7896
-615
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+7896
-615
lines changed

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
<version>${couchbase-transactions}</version>
4646
</dependency>
4747

48-
4948
<dependency>
5049
<groupId>org.springframework</groupId>
5150
<artifactId>spring-context-support</artifactId>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
/*
3+
* Copyright 2021 the original author or authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.couchbase.transactions;
18+
19+
/**
20+
* To access the AttemptContextReactive held by AttemptContext
21+
*
22+
* @author Michael Reiche
23+
*/
24+
public class AttemptContextReactiveAccessor {
25+
26+
public static AttemptContextReactive getACR(AttemptContext attemptContext) {
27+
return attemptContext.ctx();
28+
}
29+
}

src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818

1919
import java.io.Closeable;
2020

21+
import com.couchbase.transactions.AttemptContextReactive;
22+
import com.couchbase.transactions.Transactions;
23+
import com.couchbase.transactions.config.TransactionConfig;
2124
import org.springframework.dao.support.PersistenceExceptionTranslator;
2225

2326
import com.couchbase.client.java.Bucket;
2427
import com.couchbase.client.java.Cluster;
2528
import com.couchbase.client.java.Collection;
2629
import com.couchbase.client.java.Scope;
30+
import org.springframework.data.couchbase.transaction.ClientSession;
31+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
32+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
2733

2834
/**
2935
* The {@link CouchbaseClientFactory} is the main way to get access to the managed SDK instance and resources.
@@ -73,4 +79,10 @@ public interface CouchbaseClientFactory extends Closeable {
7379
*/
7480
PersistenceExceptionTranslator getExceptionTranslator();
7581

82+
ClientSession getSession(ClientSessionOptions options, Transactions transactions,
83+
TransactionConfig config , AttemptContextReactive atr);
84+
85+
CouchbaseClientFactory with(CouchbaseStuffHandle txOp);
86+
87+
CouchbaseStuffHandle getTransactionalOperator();
7688
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2016-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.couchbase;
17+
18+
import com.couchbase.client.java.Bucket;
19+
import com.couchbase.client.java.Cluster;
20+
import com.couchbase.client.java.ClusterInterface;
21+
import com.couchbase.client.java.Collection;
22+
import com.couchbase.client.java.Scope;
23+
import com.couchbase.transactions.AttemptContextReactive;
24+
import com.couchbase.transactions.Transactions;
25+
import com.couchbase.transactions.config.TransactionConfig;
26+
import org.springframework.data.couchbase.transaction.ClientSession;
27+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
28+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
29+
import reactor.core.publisher.Mono;
30+
31+
import org.springframework.dao.support.PersistenceExceptionTranslator;
32+
33+
import java.io.IOException;
34+
35+
36+
/**
37+
* Interface for factories creating reactive {@link Cluster} instances.
38+
*
39+
* @author Mark Paluch
40+
* @author Christoph Strobl
41+
* @author Mathieu Ouellet
42+
* @since 2.0
43+
*/
44+
public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*/ {
45+
46+
/**
47+
* Provides access to the managed SDK {@link Cluster} reference.
48+
*/
49+
//Cluster getCluster();
50+
51+
Mono<ClusterInterface> getCluster();
52+
53+
/**
54+
* Provides access to the managed SDK {@link Bucket} reference.
55+
*/
56+
Mono<Bucket> getBucket();
57+
58+
/**
59+
* Provides access to the managed SDK {@link Scope} reference.
60+
*/
61+
//Scope getScope();
62+
63+
Mono<Scope> getScope();
64+
65+
/**
66+
* Provides access to a collection (identified by its name) in managed SDK {@link Scope} reference.
67+
*
68+
* @param name the name of the collection. If null is passed in, the default collection is assumed.
69+
*/
70+
//Collection getCollection(String name);
71+
72+
Mono<Collection> getCollection(String name);
73+
/**
74+
* Provides access to the default collection.
75+
*/
76+
Mono<Collection> getDefaultCollection();
77+
78+
/**
79+
* Returns a new {@link CouchbaseClientFactory} set to the scope given as an argument.
80+
*
81+
* @param scopeName the name of the scope to use for all collection access.
82+
* @return a new client factory, bound to the other scope.
83+
*/
84+
ReactiveCouchbaseClientFactory withScope(String scopeName);
85+
86+
/**
87+
* The exception translator used on the factory.
88+
*/
89+
PersistenceExceptionTranslator getExceptionTranslator();
90+
91+
Mono<ClientSession> getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config);
92+
93+
String getBucketName();
94+
95+
String getScopeName();
96+
97+
void close() throws IOException;
98+
99+
Mono<ClientSession> getSession(ClientSessionOptions options);
100+
101+
ClientSession getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config,
102+
AttemptContextReactive atr);
103+
104+
/*
105+
* (non-Javadoc)
106+
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#withSession(com.mongodb.session.ClientSession)
107+
*/
108+
ReactiveCouchbaseClientFactory withSession(ClientSession session);
109+
110+
/*
111+
* (non-Javadoc)
112+
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#isTransactionActive()
113+
*/
114+
boolean isTransactionActive();
115+
116+
CouchbaseStuffHandle getTransactionalOperator();
117+
118+
ReactiveCouchbaseClientFactory with(CouchbaseStuffHandle txOp);
119+
}

src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,23 @@
1515
*/
1616
package org.springframework.data.couchbase;
1717

18+
import java.time.Duration;
19+
import java.time.temporal.ChronoUnit;
20+
import java.util.Optional;
21+
import java.util.UUID;
1822
import java.util.function.Supplier;
1923

24+
import com.couchbase.transactions.AttemptContext;
25+
import com.couchbase.transactions.TransactionContext;
26+
import com.couchbase.transactions.config.MergedTransactionConfig;
27+
import com.couchbase.transactions.config.PerTransactionConfig;
28+
import com.couchbase.transactions.config.PerTransactionConfigBuilder;
2029
import org.springframework.dao.support.PersistenceExceptionTranslator;
2130
import org.springframework.data.couchbase.core.CouchbaseExceptionTranslator;
31+
import org.springframework.data.couchbase.transaction.ClientSession;
32+
import org.springframework.data.couchbase.transaction.ClientSessionImpl;
33+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
34+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
2235

2336
import com.couchbase.client.core.env.Authenticator;
2437
import com.couchbase.client.core.env.OwnedSupplier;
@@ -29,6 +42,9 @@
2942
import com.couchbase.client.java.Collection;
3043
import com.couchbase.client.java.Scope;
3144
import com.couchbase.client.java.env.ClusterEnvironment;
45+
import com.couchbase.transactions.AttemptContextReactive;
46+
import com.couchbase.transactions.Transactions;
47+
import com.couchbase.transactions.config.TransactionConfig;
3248

3349
/**
3450
* The default implementation of a {@link CouchbaseClientFactory}.
@@ -42,6 +58,7 @@ public class SimpleCouchbaseClientFactory implements CouchbaseClientFactory {
4258
private final Bucket bucket;
4359
private final Scope scope;
4460
private final PersistenceExceptionTranslator exceptionTranslator;
61+
private final CouchbaseStuffHandle transactionalOperator;
4562

4663
public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
4764
final String bucketName) {
@@ -68,10 +85,16 @@ public SimpleCouchbaseClientFactory(final Cluster cluster, final String bucketNa
6885

6986
private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName,
7087
final String scopeName) {
88+
this(cluster, bucketName, scopeName, null);
89+
}
90+
91+
private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName, final String scopeName,
92+
final CouchbaseStuffHandle transactionalOperator) {
7193
this.cluster = cluster;
7294
this.bucket = cluster.get().bucket(bucketName);
7395
this.scope = scopeName == null ? bucket.defaultScope() : bucket.scope(scopeName);
7496
this.exceptionTranslator = new CouchbaseExceptionTranslator();
97+
this.transactionalOperator = transactionalOperator;
7598
}
7699

77100
@Override
@@ -116,11 +139,51 @@ public PersistenceExceptionTranslator getExceptionTranslator() {
116139
return exceptionTranslator;
117140
}
118141

142+
@Override
143+
public ClientSession getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config,
144+
AttemptContextReactive atr) {
145+
146+
AttemptContext at = AttemptContext.from( atr != null ? atr : transactions.reactive().newAttemptContextReactive());
147+
148+
return new ClientSessionImpl(this, transactions, config, at);
149+
}
150+
151+
/* copied from AttemptContextReactive - needs to have cleanup() and createAttemptContext() public
152+
153+
public AttemptContextReactive newAttemptContextReactive(Transactions transactions, TransactionConfig config){
154+
PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
155+
MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig));
156+
157+
TransactionContext overall = new TransactionContext(
158+
transactions.reactive().cleanup().clusterData().cluster().environment().requestTracer(),
159+
transactions.reactive().cleanup().clusterData().cluster().environment().eventBus(),
160+
UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
161+
162+
String txnId = UUID.randomUUID().toString();
163+
//overall.LOGGER.info(configDebug(config, perConfig));
164+
return transactions.reactive().createAttemptContext(overall, merged, txnId);
165+
}
166+
*/
167+
168+
@Override
169+
public CouchbaseClientFactory with(CouchbaseStuffHandle txOp) {
170+
return new SimpleCouchbaseClientFactory(cluster, bucket.name(), scope.name(), txOp);
171+
}
172+
173+
@Override
174+
public CouchbaseStuffHandle getTransactionalOperator() {
175+
return (CouchbaseStuffHandle) transactionalOperator;
176+
}
177+
119178
@Override
120179
public void close() {
121180
if (cluster instanceof OwnedSupplier) {
122181
cluster.get().disconnect();
123182
}
124183
}
125184

185+
private static Duration now() {
186+
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
187+
}
188+
126189
}

0 commit comments

Comments
 (0)