Skip to content

Commit 3a33f44

Browse files
committed
Support for @transactional for blocking and reactive Transactionmanager.
Closes 1145.
1 parent 1679651 commit 3a33f44

File tree

96 files changed

+7117
-559
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+7117
-559
lines changed

pom.xml

+27-2
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@
3939

4040
<dependencies>
4141

42+
<!--
4243
<dependency>
4344
<groupId>com.couchbase.client</groupId>
4445
<artifactId>couchbase-transactions</artifactId>
4546
<version>${couchbase-transactions}</version>
4647
</dependency>
47-
48+
-->
4849

4950
<dependency>
5051
<groupId>org.springframework</groupId>
@@ -66,12 +67,13 @@
6667
<artifactId>spring-data-commons</artifactId>
6768
<version>${springdata.commons}</version>
6869
</dependency>
69-
70+
<!--
7071
<dependency>
7172
<groupId>com.couchbase.client</groupId>
7273
<artifactId>java-client</artifactId>
7374
<version>${couchbase}</version>
7475
</dependency>
76+
-->
7577

7678
<dependency>
7779
<groupId>org.springframework</groupId>
@@ -221,9 +223,32 @@
221223
<version>4.0.3</version>
222224
<scope>test</scope>
223225
</dependency>
226+
<dependency>
227+
<groupId>com.couchbase.client</groupId>
228+
<artifactId>couchbase-transactions</artifactId>
229+
<version>1.2.1-SNAPSHOT</version>
230+
<scope>compile</scope>
231+
</dependency>
232+
<dependency>
233+
<groupId>org.testcontainers</groupId>
234+
<artifactId>testcontainers</artifactId>
235+
</dependency>
236+
<dependency>
237+
<groupId>com.couchbase.client</groupId>
238+
<artifactId>java-client</artifactId>
239+
<version>3.2.2-SNAPSHOT</version>
240+
<scope>compile</scope>
241+
</dependency>
224242

225243
</dependencies>
226244

245+
246+
247+
<modules>
248+
<module>../couchbase-transactions-java</module>
249+
<module>../../couchbase-jvm-clients/java-client</module>
250+
</modules>
251+
227252
<repositories>
228253
<repository>
229254
<id>spring-libs-snapshot</id>

src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java

+12
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818

1919
import java.io.Closeable;
2020

21+
import com.couchbase.transactions.AttemptContextReactive;
22+
import com.couchbase.transactions.Transactions;
23+
import com.couchbase.transactions.config.TransactionConfig;
2124
import org.springframework.dao.support.PersistenceExceptionTranslator;
2225

2326
import com.couchbase.client.java.Bucket;
2427
import com.couchbase.client.java.Cluster;
2528
import com.couchbase.client.java.Collection;
2629
import com.couchbase.client.java.Scope;
30+
import org.springframework.data.couchbase.transaction.ClientSession;
31+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
32+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
2733

2834
/**
2935
* The {@link CouchbaseClientFactory} is the main way to get access to the managed SDK instance and resources.
@@ -73,4 +79,10 @@ public interface CouchbaseClientFactory extends Closeable {
7379
*/
7480
PersistenceExceptionTranslator getExceptionTranslator();
7581

82+
ClientSession getSession(ClientSessionOptions options, Transactions transactions,
83+
TransactionConfig config , AttemptContextReactive atr);
84+
85+
CouchbaseClientFactory with(CouchbaseStuffHandle txOp);
86+
87+
CouchbaseStuffHandle getTransactionalOperator();
7688
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2016-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.couchbase;
17+
18+
import com.couchbase.client.java.Bucket;
19+
import com.couchbase.client.java.Cluster;
20+
import com.couchbase.client.java.ClusterInterface;
21+
import com.couchbase.client.java.Collection;
22+
import com.couchbase.client.java.Scope;
23+
import com.couchbase.transactions.AttemptContextReactive;
24+
import com.couchbase.transactions.Transactions;
25+
import com.couchbase.transactions.config.TransactionConfig;
26+
import org.springframework.data.couchbase.transaction.ClientSession;
27+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
28+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
29+
import reactor.core.publisher.Mono;
30+
31+
import org.springframework.dao.support.PersistenceExceptionTranslator;
32+
33+
import java.io.IOException;
34+
35+
36+
/**
37+
* Interface for factories creating reactive {@link Cluster} instances.
38+
*
39+
* @author Mark Paluch
40+
* @author Christoph Strobl
41+
* @author Mathieu Ouellet
42+
* @since 2.0
43+
*/
44+
public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*/ {
45+
46+
/**
47+
* Provides access to the managed SDK {@link Cluster} reference.
48+
*/
49+
//Cluster getCluster();
50+
51+
Mono<ClusterInterface> getCluster();
52+
53+
/**
54+
* Provides access to the managed SDK {@link Bucket} reference.
55+
*/
56+
Mono<Bucket> getBucket();
57+
58+
/**
59+
* Provides access to the managed SDK {@link Scope} reference.
60+
*/
61+
//Scope getScope();
62+
63+
Mono<Scope> getScope();
64+
65+
/**
66+
* Provides access to a collection (identified by its name) in managed SDK {@link Scope} reference.
67+
*
68+
* @param name the name of the collection. If null is passed in, the default collection is assumed.
69+
*/
70+
//Collection getCollection(String name);
71+
72+
Mono<Collection> getCollection(String name);
73+
/**
74+
* Provides access to the default collection.
75+
*/
76+
Mono<Collection> getDefaultCollection();
77+
78+
/**
79+
* Returns a new {@link CouchbaseClientFactory} set to the scope given as an argument.
80+
*
81+
* @param scopeName the name of the scope to use for all collection access.
82+
* @return a new client factory, bound to the other scope.
83+
*/
84+
ReactiveCouchbaseClientFactory withScope(String scopeName);
85+
86+
/**
87+
* The exception translator used on the factory.
88+
*/
89+
PersistenceExceptionTranslator getExceptionTranslator();
90+
91+
Mono<ClientSession> getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config);
92+
93+
String getBucketName();
94+
95+
String getScopeName();
96+
97+
void close() throws IOException;
98+
99+
Mono<ClientSession> getSession(ClientSessionOptions options);
100+
101+
ClientSession getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config,
102+
AttemptContextReactive atr);
103+
104+
/*
105+
* (non-Javadoc)
106+
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#withSession(com.mongodb.session.ClientSession)
107+
*/
108+
ReactiveCouchbaseClientFactory withSession(ClientSession session);
109+
110+
/*
111+
* (non-Javadoc)
112+
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#isTransactionActive()
113+
*/
114+
boolean isTransactionActive();
115+
116+
CouchbaseStuffHandle getTransactionalOperator();
117+
118+
ReactiveCouchbaseClientFactory with(CouchbaseStuffHandle txOp);
119+
}

src/main/java/org/springframework/data/couchbase/SimpleCouchbaseClientFactory.java

+62
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,22 @@
1515
*/
1616
package org.springframework.data.couchbase;
1717

18+
import java.time.Duration;
19+
import java.time.temporal.ChronoUnit;
20+
import java.util.Optional;
21+
import java.util.UUID;
1822
import java.util.function.Supplier;
1923

24+
import com.couchbase.transactions.TransactionContext;
25+
import com.couchbase.transactions.config.MergedTransactionConfig;
26+
import com.couchbase.transactions.config.PerTransactionConfig;
27+
import com.couchbase.transactions.config.PerTransactionConfigBuilder;
2028
import org.springframework.dao.support.PersistenceExceptionTranslator;
2129
import org.springframework.data.couchbase.core.CouchbaseExceptionTranslator;
30+
import org.springframework.data.couchbase.transaction.ClientSession;
31+
import org.springframework.data.couchbase.transaction.ClientSessionImpl;
32+
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
33+
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;
2234

2335
import com.couchbase.client.core.env.Authenticator;
2436
import com.couchbase.client.core.env.OwnedSupplier;
@@ -29,6 +41,9 @@
2941
import com.couchbase.client.java.Collection;
3042
import com.couchbase.client.java.Scope;
3143
import com.couchbase.client.java.env.ClusterEnvironment;
44+
import com.couchbase.transactions.AttemptContextReactive;
45+
import com.couchbase.transactions.Transactions;
46+
import com.couchbase.transactions.config.TransactionConfig;
3247

3348
/**
3449
* The default implementation of a {@link CouchbaseClientFactory}.
@@ -42,6 +57,7 @@ public class SimpleCouchbaseClientFactory implements CouchbaseClientFactory {
4257
private final Bucket bucket;
4358
private final Scope scope;
4459
private final PersistenceExceptionTranslator exceptionTranslator;
60+
private final CouchbaseStuffHandle transactionalOperator;
4561

4662
public SimpleCouchbaseClientFactory(final String connectionString, final Authenticator authenticator,
4763
final String bucketName) {
@@ -68,10 +84,16 @@ public SimpleCouchbaseClientFactory(final Cluster cluster, final String bucketNa
6884

6985
private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName,
7086
final String scopeName) {
87+
this(cluster, bucketName, scopeName, null);
88+
}
89+
90+
private SimpleCouchbaseClientFactory(final Supplier<Cluster> cluster, final String bucketName, final String scopeName,
91+
final CouchbaseStuffHandle transactionalOperator) {
7192
this.cluster = cluster;
7293
this.bucket = cluster.get().bucket(bucketName);
7394
this.scope = scopeName == null ? bucket.defaultScope() : bucket.scope(scopeName);
7495
this.exceptionTranslator = new CouchbaseExceptionTranslator();
96+
this.transactionalOperator = transactionalOperator;
7597
}
7698

7799
@Override
@@ -116,11 +138,51 @@ public PersistenceExceptionTranslator getExceptionTranslator() {
116138
return exceptionTranslator;
117139
}
118140

141+
@Override
142+
public ClientSession getSession(ClientSessionOptions options, Transactions transactions, TransactionConfig config,
143+
AttemptContextReactive atr) {
144+
145+
AttemptContextReactive at = atr != null ? atr : transactions.reactive().newAttemptContextReactive();
146+
147+
return new ClientSessionImpl(this, transactions, config, at);
148+
}
149+
150+
/* copied from AttemptContextReactive - needs to have cleanup() and createAttemptContext() public
151+
152+
public AttemptContextReactive newAttemptContextReactive(Transactions transactions, TransactionConfig config){
153+
PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
154+
MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.of(perConfig));
155+
156+
TransactionContext overall = new TransactionContext(
157+
transactions.reactive().cleanup().clusterData().cluster().environment().requestTracer(),
158+
transactions.reactive().cleanup().clusterData().cluster().environment().eventBus(),
159+
UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
160+
161+
String txnId = UUID.randomUUID().toString();
162+
//overall.LOGGER.info(configDebug(config, perConfig));
163+
return transactions.reactive().createAttemptContext(overall, merged, txnId);
164+
}
165+
*/
166+
167+
@Override
168+
public CouchbaseClientFactory with(CouchbaseStuffHandle txOp) {
169+
return new SimpleCouchbaseClientFactory(cluster, bucket.name(), scope.name(), txOp);
170+
}
171+
172+
@Override
173+
public CouchbaseStuffHandle getTransactionalOperator() {
174+
return (CouchbaseStuffHandle) transactionalOperator;
175+
}
176+
119177
@Override
120178
public void close() {
121179
if (cluster instanceof OwnedSupplier) {
122180
cluster.get().disconnect();
123181
}
124182
}
125183

184+
private static Duration now() {
185+
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
186+
}
187+
126188
}

0 commit comments

Comments
 (0)