Skip to content

Commit 9e7fdde

Browse files
committed
Add ReactiveTransactionWrapper/TransactionWrapper and a bunch of cleanup.
1 parent bc4dc01 commit 9e7fdde

20 files changed

+498
-654
lines changed

src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java

+38-44
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.time.temporal.ChronoUnit;
2323
import java.util.Optional;
2424
import java.util.UUID;
25+
import java.util.function.Consumer;
26+
import java.util.logging.Logger;
2527

2628
import com.couchbase.client.core.annotation.Stability;
2729
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
@@ -33,6 +35,7 @@
3335
import com.couchbase.client.core.transaction.support.AttemptState;
3436
import com.couchbase.client.java.codec.JsonSerializer;
3537
import reactor.core.publisher.Mono;
38+
import reactor.util.annotation.Nullable;
3639

3740
/**
3841
* To access the ReactiveTransactionAttemptContext held by TransactionAttemptContext
@@ -66,13 +69,7 @@ public static ReactiveTransactionAttemptContext reactive(TransactionAttemptConte
6669
} catch (Throwable err) {
6770
throw new RuntimeException(err);
6871
}
69-
try {
70-
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
71-
field.setAccessible(true);
72-
return new ReactiveTransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
73-
} catch (Throwable err) {
74-
throw new RuntimeException(err);
75-
}
72+
return new ReactiveTransactionAttemptContext(getCore(atr), serializer);
7673
}
7774

7875
public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) {
@@ -84,35 +81,22 @@ public static TransactionAttemptContext blocking(ReactiveTransactionAttemptConte
8481
} catch (Throwable err) {
8582
throw new RuntimeException(err);
8683
}
87-
try {
88-
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
89-
field.setAccessible(true);
90-
return new TransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
91-
} catch (Throwable err) {
92-
throw new RuntimeException(err);
93-
}
84+
return new TransactionAttemptContext(getCore(atr), serializer);
9485
}
9586

9687
public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) {
97-
// todo gp needed?
98-
return null;
99-
// return attemptContextReactive;
88+
return attemptContextReactive.logger();
89+
}
90+
91+
public static CoreTransactionLogger getLogger(TransactionAttemptContext attemptContextReactive) {
92+
return attemptContextReactive.logger();
10093
}
10194

10295
// todo gp needed?
10396
@Stability.Internal
10497
public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(ReactiveTransactions transactions) {
105-
// PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
106-
// MergedTransactionConfig merged = new MergedTransactionConfig(transactions.config(), Optional.of(perConfig));
107-
//
108-
// TransactionContext overall = new TransactionContext(
109-
// transactions.cleanup().clusterData().cluster().environment().requestTracer(),
110-
// transactions.cleanup().clusterData().cluster().environment().eventBus(),
111-
// UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
11298

11399
String txnId = UUID.randomUUID().toString();
114-
// overall.LOGGER.info(configDebug(transactions.config(), perConfig));
115-
116100
CoreTransactionsReactive coreTransactionsReactive;
117101
try {
118102
Field field = ReactiveTransactions.class.getDeclaredField("internal");
@@ -122,27 +106,19 @@ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(Reac
122106
throw new RuntimeException(err);
123107
}
124108

125-
CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(),
126-
Optional.empty(),
127-
Optional.empty(),
128-
Optional.of(Duration.ofMinutes(10)),
129-
Optional.empty(),
130-
Optional.empty());
109+
CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(), Optional.empty(), Optional.empty(),
110+
Optional.of(Duration.ofMinutes(10)), Optional.empty(), Optional.empty());
131111

132112
CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(coreTransactionsReactive.config(),
133113
Optional.ofNullable(perConfig));
134114
CoreTransactionContext overall = new CoreTransactionContext(
135115
coreTransactionsReactive.core().context().environment().requestTracer(),
136116
coreTransactionsReactive.core().context().environment().eventBus(), UUID.randomUUID().toString(), merged,
137117
coreTransactionsReactive.core().transactionsCleanup());
138-
// overall.LOGGER.info(configDebug(config, perConfig, cleanup.clusterData().cluster().core()));
139118

140119
CoreTransactionAttemptContext coreTransactionAttemptContext = coreTransactionsReactive.createAttemptContext(overall,
141120
merged, txnId);
142121
return coreTransactionAttemptContext;
143-
// ReactiveTransactionAttemptContext reactiveTransactionAttemptContext = new ReactiveTransactionAttemptContext(
144-
// coreTransactionAttemptContext, null);
145-
// return reactiveTransactionAttemptContext;
146122
}
147123

148124
private static Duration now() {
@@ -168,15 +144,13 @@ public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptCo
168144
}
169145

170146
public static CoreTransactionAttemptContext getCore(TransactionAttemptContext atr) {
171-
CoreTransactionAttemptContext coreTransactionsReactive;
172147
try {
173148
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
174149
field.setAccessible(true);
175-
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
150+
return (CoreTransactionAttemptContext) field.get(atr);
176151
} catch (Throwable err) {
177152
throw new RuntimeException(err);
178153
}
179-
return coreTransactionsReactive;
180154
}
181155

182156
public static Mono<Void> implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) {
@@ -186,17 +160,17 @@ public static Mono<Void> implicitCommit(ReactiveTransactionAttemptContext atr, b
186160
// CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class);
187161
Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods();
188162
Method method = null;
189-
for(Method m:methods){
190-
if( m.getName().equals("implicitCommit")){
163+
for (Method m : methods) {
164+
if (m.getName().equals("implicitCommit")) {
191165
method = m;
192166
break;
193167
}
194168
}
195-
if(method == null){
169+
if (method == null) {
196170
throw new RuntimeException("did not find implicitCommit method");
197171
}
198172
method.setAccessible(true);
199-
return (Mono<Void>)method.invoke(coreTransactionsReactive, b);
173+
return (Mono<Void>) method.invoke(coreTransactionsReactive, b);
200174
} catch (Throwable err) {
201175
throw new RuntimeException(err);
202176
}
@@ -214,10 +188,30 @@ public static AttemptState getState(ReactiveTransactionAttemptContext atr) {
214188
}
215189
}
216190

217-
public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
191+
public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(
192+
CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
218193
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
219194
}
220195

196+
public static CoreTransactionsReactive getCoreTransactionsReactive(ReactiveTransactions transactions) {
197+
try {
198+
Field field = ReactiveTransactions.class.getDeclaredField("internal");
199+
field.setAccessible(true);
200+
return (CoreTransactionsReactive) field.get(transactions);
201+
} catch (Throwable err) {
202+
throw new RuntimeException(err);
203+
}
204+
}
205+
206+
public static TransactionAttemptContext newTransactionAttemptContext(CoreTransactionAttemptContext ctx,
207+
JsonSerializer jsonSerializer) {
208+
return new TransactionAttemptContext(ctx, jsonSerializer);
209+
}
210+
211+
public static TransactionResult run(Transactions transactions, Consumer<TransactionAttemptContext> transactionLogic, CoreTransactionOptions coreTransactionOptions) {
212+
return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
213+
}
214+
221215
// todo gp if needed let's expose in the SDK
222216
// static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
223217
// StringBuilder sb = new StringBuilder();

src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import java.io.IOException;
3434

35-
3635
/**
3736
* Interface for factories creating reactive {@link Cluster} instances.
3837
*
@@ -42,12 +41,17 @@
4241
* @since 2.0
4342
*/
4443
public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*/ {
45-
44+
4645
/**
4746
* Provides access to the managed SDK {@link Cluster} reference.
4847
*/
4948
Mono<ClusterInterface> getCluster();
5049

50+
/**
51+
* Provides access to the managed SDK {@link Cluster} reference.
52+
*/
53+
ClusterInterface getBlockingCluster();
54+
5155
/**
5256
* Provides access to the managed SDK {@link Bucket} reference.
5357
*/
@@ -58,13 +62,25 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*
5862
*/
5963
Mono<Scope> getScope();
6064

65+
/**
66+
* Provides access to the managed SDK {@link Scope} reference without block()
67+
*/
68+
Scope getBlockingScope(String scopeName);
69+
6170
/**
6271
* Provides access to a collection (identified by its name) in managed SDK {@link Scope} reference.
6372
*
6473
* @param name the name of the collection. If null is passed in, the default collection is assumed.
6574
*/
6675
Mono<Collection> getCollection(String name);
6776

77+
/**
78+
* Provides access to a collection (identified by its name) without block()
79+
*
80+
* @param name the name of the collection. If null is passed in, the default collection is assumed.
81+
*/
82+
Collection getBlockingCollection(String collectionName);
83+
6884
/**
6985
* Provides access to the default collection.
7086
*/
@@ -91,7 +107,8 @@ public interface ReactiveCouchbaseClientFactory /*extends CodecRegistryProvider*
91107

92108
void close() throws IOException;
93109

94-
ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options, CoreTransactionAttemptContext ctx);
110+
ReactiveCouchbaseResourceHolder getTransactionResources(TransactionOptions options,
111+
CoreTransactionAttemptContext ctx);
95112

96113
/*
97114
* (non-Javadoc)

src/main/java/org/springframework/data/couchbase/SimpleReactiveCouchbaseClientFactory.java

+43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
public class SimpleReactiveCouchbaseClientFactory implements ReactiveCouchbaseClientFactory {
3131
final Mono<ClusterInterface> cluster;
32+
final ClusterInterface theCluster;
3233
final String bucketName;
3334
final String scopeName;
3435
final PersistenceExceptionTranslator exceptionTranslator;
@@ -39,6 +40,7 @@ public class SimpleReactiveCouchbaseClientFactory implements ReactiveCouchbaseCl
3940
public SimpleReactiveCouchbaseClientFactory(Cluster cluster, String bucketName, String scopeName,
4041
CouchbaseTransactionalOperator transactionalOperator) {
4142
this.cluster = Mono.just(cluster);
43+
this.theCluster = cluster;
4244
this.bucketName = bucketName;
4345
this.scopeName = scopeName;
4446
this.exceptionTranslator = new CouchbaseExceptionTranslator();
@@ -56,6 +58,12 @@ public Mono<ClusterInterface> getCluster() {
5658
return cluster;
5759
}
5860

61+
62+
@Override
63+
public ClusterInterface getBlockingCluster() {
64+
return theCluster;
65+
}
66+
5967
@Override
6068
public Mono<Bucket> getBucket() {
6169
return cluster.map((c) -> c.bucket(bucketName));
@@ -71,6 +79,11 @@ public Mono<Scope> getScope() {
7179
return cluster.map((c) -> c.bucket(bucketName).scope(scopeName != null ? scopeName : DEFAULT_SCOPE));
7280
}
7381

82+
@Override
83+
public Scope getBlockingScope(String scopeName) {
84+
return theCluster.bucket(bucketName).scope(scopeName != null ? scopeName : (this.scopeName != null ? this.scopeName : DEFAULT_SCOPE));
85+
}
86+
7487
@Override
7588
public String getScopeName() {
7689
return scopeName;
@@ -92,6 +105,22 @@ public Mono<Collection> getCollection(String collectionName) {
92105
return getScope().map((s) -> s.collection(collectionName != null ? collectionName : DEFAULT_COLLECTION));
93106
}
94107

108+
@Override
109+
public Collection getBlockingCollection(String collectionName) {
110+
if (getScopeName() != null && !DEFAULT_SCOPE.equals(getScopeName())) {
111+
if (collectionName == null || DEFAULT_COLLECTION.equals(collectionName)) {
112+
throw new IllegalStateException("A collectionName must be provided if a non-default scope is used.");
113+
}
114+
}
115+
if (getScopeName() == null || DEFAULT_SCOPE.equals(getScopeName())) {
116+
if (collectionName != null && !DEFAULT_COLLECTION.equals(collectionName)) {
117+
throw new IllegalStateException(
118+
"A collectionName must be null or " + DEFAULT_COLLECTION + " if scope is null or " + DEFAULT_SCOPE);
119+
}
120+
}
121+
return theCluster.bucket(bucketName).scope(scopeName != null ? scopeName : DEFAULT_SCOPE).collection(collectionName != null ? collectionName : DEFAULT_COLLECTION);
122+
}
123+
95124
@Override
96125
public Mono<Collection> getDefaultCollection() {
97126
if (getScopeName() != null && DEFAULT_SCOPE.equals(getScopeName())) {
@@ -204,6 +233,11 @@ public Mono<ClusterInterface> getCluster() throws DataAccessException {
204233
return delegate.getCluster().map(this::decorateDatabase);
205234
}
206235

236+
@Override
237+
public ClusterInterface getBlockingCluster() throws DataAccessException {
238+
return decorateDatabase(delegate.getBlockingCluster());
239+
}
240+
207241
@Override
208242
public Mono<Bucket> getBucket() {
209243
return delegate.getBucket();
@@ -219,6 +253,15 @@ public Mono<Collection> getCollection(String name) {
219253
return delegate.getCollection(name);
220254
}
221255

256+
@Override
257+
public Collection getBlockingCollection(String collectionName) {
258+
return delegate.getBlockingCollection(collectionName);
259+
}
260+
261+
@Override
262+
public Scope getBlockingScope(String scopeName) {
263+
return delegate.getBlockingScope(scopeName);
264+
}
222265
@Override
223266
public Mono<Collection> getDefaultCollection() {
224267
return delegate.getDefaultCollection();

src/main/java/org/springframework/data/couchbase/core/ReactiveExistsByIdOperationSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public Mono<Boolean> one(final String id) {
7575
LOG.trace("existsById {}", pArgs);
7676
return Mono.just(id)
7777
.flatMap(docId -> template.getCouchbaseClientFactory().withScope(pArgs.getScope())
78-
.getCollection(pArgs.getCollection()).block().reactive().exists(id, buildOptions(pArgs.getOptions()))
78+
.getBlockingCollection(pArgs.getCollection()).reactive().exists(id, buildOptions(pArgs.getOptions()))
7979
.map(ExistsResult::exists))
8080
.onErrorMap(throwable -> {
8181
if (throwable instanceof RuntimeException) {

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByAnalyticsOperationSupport.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public Mono<T> first() {
109109
public Flux<T> all() {
110110
return Flux.defer(() -> {
111111
String statement = assembleEntityQuery(false);
112-
return template.getCouchbaseClientFactory().getCluster().block().reactive()
112+
return template.getCouchbaseClientFactory().getBlockingCluster().reactive()
113113
.analyticsQuery(statement, buildAnalyticsOptions()).onErrorMap(throwable -> {
114114
if (throwable instanceof RuntimeException) {
115115
return template.potentiallyConvertRuntimeException((RuntimeException) throwable);
@@ -142,7 +142,7 @@ public Flux<T> all() {
142142
public Mono<Long> count() {
143143
return Mono.defer(() -> {
144144
String statement = assembleEntityQuery(true);
145-
return template.getCouchbaseClientFactory().getCluster().block().reactive()
145+
return template.getCouchbaseClientFactory().getBlockingCluster().reactive()
146146
.analyticsQuery(statement, buildAnalyticsOptions()).onErrorMap(throwable -> {
147147
if (throwable instanceof RuntimeException) {
148148
return template.potentiallyConvertRuntimeException((RuntimeException) throwable);

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByIdOperationSupport.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public Mono<T> one(final String id) {
9292
LOG.trace("findById {}", pArgs);
9393

9494
ReactiveCollection rc = template.getCouchbaseClientFactory().withScope(pArgs.getScope())
95-
.getCollection(pArgs.getCollection()).block().reactive();
95+
.getBlockingCollection(pArgs.getCollection()).reactive();
9696

9797
// this will get me a template with a session holding tx
9898
Mono<ReactiveCouchbaseTemplate> tmpl = template.doGetTemplate();
@@ -118,16 +118,6 @@ public Mono<T> one(final String id) {
118118
// todo gp no cas // todo mr - it's required by replace().one when comparing to internal.cas(). it's gone
119119
// todo gp if we need this of course needs to be exposed nicely
120120
Long cas = result.cas();
121-
/*
122-
try {
123-
Method method = TransactionGetResult.class.getDeclaredMethod("internal");
124-
method.setAccessible(true);
125-
CoreTransactionGetResult internal = (CoreTransactionGetResult) method.invoke(result);
126-
cas = internal.cas();
127-
} catch (Throwable err) {
128-
throw new RuntimeException(err);
129-
}
130-
*/
131121
return support.decodeEntity(id, new String(result.contentAsBytes()), cas, domainType, pArgs.getScope(),
132122
pArgs.getCollection(), new TransactionResultHolder(result), null).doOnNext(out -> {
133123
// todo gp is this safe? are we on the right thread?

0 commit comments

Comments
 (0)