Skip to content

Commit df19157

Browse files
committed
Get scope and collection from pseudoArgs and some cleanup.
1 parent 56ad3fa commit df19157

File tree

42 files changed

+1179
-1205
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1179
-1205
lines changed

src/main/java/com/couchbase/client/java/Cluster.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public static Cluster connect(final Set<SeedNode> seedNodes, final ClusterOption
285285
notNull(options, "ClusterOptions");
286286

287287
final ClusterOptions.Built opts = options.build();
288-
return new Cluster(extractClusterEnvironment(null, opts), opts.authenticator(), seedNodes);
288+
return new Cluster(extractClusterEnvironment("", opts), opts.authenticator(), seedNodes);
289289
}
290290

291291
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
/*
3+
* Copyright 2021 the original author or authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.couchbase.client.java.transactions;
18+
19+
import java.lang.reflect.Field;
20+
import java.lang.reflect.Method;
21+
import java.time.Duration;
22+
import java.time.temporal.ChronoUnit;
23+
import java.util.Optional;
24+
import java.util.UUID;
25+
26+
import com.couchbase.client.core.annotation.Stability;
27+
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
28+
import com.couchbase.client.core.transaction.CoreTransactionContext;
29+
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
30+
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
31+
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
32+
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
33+
import com.couchbase.client.java.codec.JsonSerializer;
34+
import reactor.core.publisher.Mono;
35+
36+
/**
37+
* To access the ReactiveTransactionAttemptContext held by TransactionAttemptContext
38+
*
39+
* @author Michael Reiche
40+
*/
41+
public class AttemptContextReactiveAccessor {
42+
43+
public static ReactiveTransactionAttemptContext getACR(TransactionAttemptContext attemptContext) {
44+
// return attemptContext.ctx();
45+
// todo gp is this access needed. Could hold the raw CoreTransactionAttemptContext instead.
46+
return null;
47+
}
48+
49+
public static ReactiveTransactions reactive(Transactions transactions) {
50+
try {
51+
Field field = Transactions.class.getDeclaredField("reactive");
52+
field.setAccessible(true);
53+
return (ReactiveTransactions) field.get(transactions);
54+
} catch (Throwable err) {
55+
throw new RuntimeException(err);
56+
}
57+
}
58+
59+
public static ReactiveTransactionAttemptContext reactive(TransactionAttemptContext atr) {
60+
JsonSerializer serializer;
61+
try {
62+
Field field = TransactionAttemptContext.class.getDeclaredField("serializer");
63+
field.setAccessible(true);
64+
serializer = (JsonSerializer) field.get(atr);
65+
} catch (Throwable err) {
66+
throw new RuntimeException(err);
67+
}
68+
try {
69+
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
70+
field.setAccessible(true);
71+
return new ReactiveTransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
72+
} catch (Throwable err) {
73+
throw new RuntimeException(err);
74+
}
75+
}
76+
77+
public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) {
78+
JsonSerializer serializer;
79+
try {
80+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("serializer");
81+
field.setAccessible(true);
82+
serializer = (JsonSerializer) field.get(atr);
83+
} catch (Throwable err) {
84+
throw new RuntimeException(err);
85+
}
86+
try {
87+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
88+
field.setAccessible(true);
89+
return new TransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
90+
} catch (Throwable err) {
91+
throw new RuntimeException(err);
92+
}
93+
}
94+
95+
public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) {
96+
// todo gp needed?
97+
return null;
98+
// return attemptContextReactive;
99+
}
100+
101+
// todo gp needed?
102+
@Stability.Internal
103+
public static CoreTransactionAttemptContext newAttemptContextReactive(ReactiveTransactions transactions) {
104+
// PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
105+
// MergedTransactionConfig merged = new MergedTransactionConfig(transactions.config(), Optional.of(perConfig));
106+
//
107+
// TransactionContext overall = new TransactionContext(
108+
// transactions.cleanup().clusterData().cluster().environment().requestTracer(),
109+
// transactions.cleanup().clusterData().cluster().environment().eventBus(),
110+
// UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
111+
112+
String txnId = UUID.randomUUID().toString();
113+
// overall.LOGGER.info(configDebug(transactions.config(), perConfig));
114+
115+
CoreTransactionsReactive coreTransactionsReactive;
116+
try {
117+
Field field = ReactiveTransactions.class.getDeclaredField("internal");
118+
field.setAccessible(true);
119+
coreTransactionsReactive = (CoreTransactionsReactive) field.get(transactions);
120+
} catch (Throwable err) {
121+
throw new RuntimeException(err);
122+
}
123+
124+
CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(),
125+
Optional.empty(),
126+
Optional.empty(),
127+
Optional.of(Duration.ofMinutes(10)),
128+
Optional.empty(),
129+
Optional.empty());
130+
131+
CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(coreTransactionsReactive.config(),
132+
Optional.ofNullable(perConfig));
133+
CoreTransactionContext overall = new CoreTransactionContext(
134+
coreTransactionsReactive.core().context().environment().requestTracer(),
135+
coreTransactionsReactive.core().context().environment().eventBus(), UUID.randomUUID().toString(), merged,
136+
coreTransactionsReactive.core().transactionsCleanup());
137+
// overall.LOGGER.info(configDebug(config, perConfig, cleanup.clusterData().cluster().core()));
138+
139+
CoreTransactionAttemptContext coreTransactionAttemptContext = coreTransactionsReactive.createAttemptContext(overall,
140+
merged, txnId);
141+
return coreTransactionAttemptContext;
142+
// ReactiveTransactionAttemptContext reactiveTransactionAttemptContext = new ReactiveTransactionAttemptContext(
143+
// coreTransactionAttemptContext, null);
144+
// return reactiveTransactionAttemptContext;
145+
}
146+
147+
private static Duration now() {
148+
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
149+
}
150+
151+
public static ReactiveTransactionAttemptContext from(CoreTransactionAttemptContext coreTransactionAttemptContext,
152+
JsonSerializer serializer) {
153+
TransactionAttemptContext tac = new TransactionAttemptContext(coreTransactionAttemptContext, serializer);
154+
return reactive(tac);
155+
}
156+
157+
public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptContext atr) {
158+
CoreTransactionAttemptContext coreTransactionsReactive;
159+
try {
160+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
161+
field.setAccessible(true);
162+
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
163+
} catch (Throwable err) {
164+
throw new RuntimeException(err);
165+
}
166+
return coreTransactionsReactive;
167+
}
168+
169+
public static Mono<Void> implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) {
170+
CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
171+
try {
172+
// getDeclaredMethod() does not find it (because of primitive arg?)
173+
// CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class);
174+
Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods();
175+
Method method = null;
176+
for(Method m:methods){
177+
if( m.getName().equals("implicitCommit")){
178+
method = m;
179+
break;
180+
}
181+
}
182+
if(method == null){
183+
throw new RuntimeException("did not find implicitCommit method");
184+
}
185+
method.setAccessible(true);
186+
return (Mono<Void>)method.invoke(coreTransactionsReactive, b);
187+
} catch (Throwable err) {
188+
throw new RuntimeException(err);
189+
}
190+
191+
}
192+
193+
// todo gp if needed let's expose in the SDK
194+
// static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
195+
// StringBuilder sb = new StringBuilder();
196+
// sb.append("library version: ");
197+
// sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
198+
// sb.append(" config: ");
199+
// sb.append("atrs=");
200+
// sb.append(config.numAtrs());
201+
// sb.append(", metadataCollection=");
202+
// sb.append(config.metadataCollection());
203+
// sb.append(", expiry=");
204+
// sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
205+
// sb.append("msecs durability=");
206+
// sb.append(config.durabilityLevel());
207+
// sb.append(" per-txn config=");
208+
// sb.append(" durability=");
209+
// sb.append(perConfig.durabilityLevel());
210+
// sb.append(", supported=");
211+
// sb.append(Supported.SUPPORTED);
212+
// return sb.toString();
213+
// }
214+
215+
}

src/main/java/com/couchbase/transactions/AttemptContextReactiveAccessor.java

-93
This file was deleted.

src/main/java/com/example/demo/CouchbaseTransactionManager.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44

55
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
66
import com.couchbase.client.java.transactions.TransactionAttemptContext;
7-
import com.couchbase.client.java.transactions.TransactionResult;
8-
import com.couchbase.transactions.AttemptContextReactiveAccessor;
7+
import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor;
98
import org.slf4j.Logger;
109
import org.slf4j.LoggerFactory;
1110
import org.springframework.beans.factory.DisposableBean;
12-
import org.springframework.data.couchbase.CouchbaseClientFactory;
1311
import org.springframework.data.couchbase.core.CouchbaseTemplate;
1412
import org.springframework.data.couchbase.transaction.ClientSession;
1513
import org.springframework.data.couchbase.transaction.ClientSessionImpl;

src/main/java/org/springframework/data/couchbase/CouchbaseClientFactory.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.io.Closeable;
2020

2121
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
22+
import com.couchbase.client.java.transactions.Transactions;
23+
import com.couchbase.client.java.transactions.config.TransactionsConfig;
2224
import org.springframework.dao.support.PersistenceExceptionTranslator;
2325

2426
import com.couchbase.client.java.Bucket;
@@ -77,9 +79,10 @@ public interface CouchbaseClientFactory extends Closeable {
7779
*/
7880
PersistenceExceptionTranslator getExceptionTranslator();
7981

80-
ClientSession getSession(ClientSessionOptions options, ReactiveTransactionAttemptContext atr);
82+
ClientSession getSession(ClientSessionOptions options,
83+
ReactiveTransactionAttemptContext atr);
8184

82-
//CouchbaseClientFactory with(CouchbaseStuffHandle txOp);
85+
//CouchbaseClientFactory with(CouchbaseStuffHandle txOp);
8386

8487
//CouchbaseStuffHandle getTransactionalOperator();
8588
}

src/main/java/org/springframework/data/couchbase/ReactiveCouchbaseClientFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.couchbase.client.java.Collection;
2222
import com.couchbase.client.java.Scope;
2323
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
24+
import com.couchbase.client.java.transactions.TransactionAttemptContext;
25+
import com.couchbase.client.java.transactions.Transactions;
2426
import org.springframework.data.couchbase.transaction.ClientSession;
2527
import org.springframework.data.couchbase.transaction.ClientSessionOptions;
2628
import org.springframework.data.couchbase.transaction.CouchbaseStuffHandle;

0 commit comments

Comments
 (0)