Skip to content

Commit bc4dc01

Browse files
committed
Get scope and collection from pseudoArgs and some cleanup.
1 parent 56ad3fa commit bc4dc01

File tree

84 files changed

+3039
-3289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3039
-3289
lines changed

src/main/java/com/couchbase/client/java/Cluster.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public static Cluster connect(final Set<SeedNode> seedNodes, final ClusterOption
285285
notNull(options, "ClusterOptions");
286286

287287
final ClusterOptions.Built opts = options.build();
288-
return new Cluster(extractClusterEnvironment(null, opts), opts.authenticator(), seedNodes);
288+
return new Cluster(extractClusterEnvironment("", opts), opts.authenticator(), seedNodes);
289289
}
290290

291291
/**

src/main/java/com/couchbase/client/java/ClusterInterface.java

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import com.couchbase.client.java.search.SearchOptions;
4141
import com.couchbase.client.java.search.SearchQuery;
4242
import com.couchbase.client.java.search.result.SearchResult;
43+
import com.couchbase.client.java.transactions.Transactions;
44+
import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
4345

4446
import java.time.Duration;
4547
import java.util.Set;
@@ -104,4 +106,6 @@ public interface ClusterInterface {
104106
void waitUntilReady(Duration timeout);
105107

106108
void waitUntilReady(Duration timeout, WaitUntilReadyOptions options);
109+
110+
Transactions transactions();
107111
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
/*
3+
* Copyright 2021 the original author or authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.couchbase.client.java.transactions;
18+
19+
import java.lang.reflect.Field;
20+
import java.lang.reflect.Method;
21+
import java.time.Duration;
22+
import java.time.temporal.ChronoUnit;
23+
import java.util.Optional;
24+
import java.util.UUID;
25+
26+
import com.couchbase.client.core.annotation.Stability;
27+
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
28+
import com.couchbase.client.core.transaction.CoreTransactionContext;
29+
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
30+
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
31+
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
32+
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
33+
import com.couchbase.client.core.transaction.support.AttemptState;
34+
import com.couchbase.client.java.codec.JsonSerializer;
35+
import reactor.core.publisher.Mono;
36+
37+
/**
38+
* To access the ReactiveTransactionAttemptContext held by TransactionAttemptContext
39+
*
40+
* @author Michael Reiche
41+
*/
42+
public class AttemptContextReactiveAccessor {
43+
44+
public static ReactiveTransactionAttemptContext getACR(TransactionAttemptContext attemptContext) {
45+
// return attemptContext.ctx();
46+
// todo gp is this access needed. Could hold the raw CoreTransactionAttemptContext instead.
47+
return null;
48+
}
49+
50+
public static ReactiveTransactions reactive(Transactions transactions) {
51+
try {
52+
Field field = Transactions.class.getDeclaredField("reactive");
53+
field.setAccessible(true);
54+
return (ReactiveTransactions) field.get(transactions);
55+
} catch (Throwable err) {
56+
throw new RuntimeException(err);
57+
}
58+
}
59+
60+
public static ReactiveTransactionAttemptContext reactive(TransactionAttemptContext atr) {
61+
JsonSerializer serializer;
62+
try {
63+
Field field = TransactionAttemptContext.class.getDeclaredField("serializer");
64+
field.setAccessible(true);
65+
serializer = (JsonSerializer) field.get(atr);
66+
} catch (Throwable err) {
67+
throw new RuntimeException(err);
68+
}
69+
try {
70+
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
71+
field.setAccessible(true);
72+
return new ReactiveTransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
73+
} catch (Throwable err) {
74+
throw new RuntimeException(err);
75+
}
76+
}
77+
78+
public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) {
79+
JsonSerializer serializer;
80+
try {
81+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("serializer");
82+
field.setAccessible(true);
83+
serializer = (JsonSerializer) field.get(atr);
84+
} catch (Throwable err) {
85+
throw new RuntimeException(err);
86+
}
87+
try {
88+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
89+
field.setAccessible(true);
90+
return new TransactionAttemptContext((CoreTransactionAttemptContext) field.get(atr), serializer);
91+
} catch (Throwable err) {
92+
throw new RuntimeException(err);
93+
}
94+
}
95+
96+
public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) {
97+
// todo gp needed?
98+
return null;
99+
// return attemptContextReactive;
100+
}
101+
102+
// todo gp needed?
103+
@Stability.Internal
104+
public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(ReactiveTransactions transactions) {
105+
// PerTransactionConfig perConfig = PerTransactionConfigBuilder.create().build();
106+
// MergedTransactionConfig merged = new MergedTransactionConfig(transactions.config(), Optional.of(perConfig));
107+
//
108+
// TransactionContext overall = new TransactionContext(
109+
// transactions.cleanup().clusterData().cluster().environment().requestTracer(),
110+
// transactions.cleanup().clusterData().cluster().environment().eventBus(),
111+
// UUID.randomUUID().toString(), now(), Duration.ZERO, merged);
112+
113+
String txnId = UUID.randomUUID().toString();
114+
// overall.LOGGER.info(configDebug(transactions.config(), perConfig));
115+
116+
CoreTransactionsReactive coreTransactionsReactive;
117+
try {
118+
Field field = ReactiveTransactions.class.getDeclaredField("internal");
119+
field.setAccessible(true);
120+
coreTransactionsReactive = (CoreTransactionsReactive) field.get(transactions);
121+
} catch (Throwable err) {
122+
throw new RuntimeException(err);
123+
}
124+
125+
CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(),
126+
Optional.empty(),
127+
Optional.empty(),
128+
Optional.of(Duration.ofMinutes(10)),
129+
Optional.empty(),
130+
Optional.empty());
131+
132+
CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(coreTransactionsReactive.config(),
133+
Optional.ofNullable(perConfig));
134+
CoreTransactionContext overall = new CoreTransactionContext(
135+
coreTransactionsReactive.core().context().environment().requestTracer(),
136+
coreTransactionsReactive.core().context().environment().eventBus(), UUID.randomUUID().toString(), merged,
137+
coreTransactionsReactive.core().transactionsCleanup());
138+
// overall.LOGGER.info(configDebug(config, perConfig, cleanup.clusterData().cluster().core()));
139+
140+
CoreTransactionAttemptContext coreTransactionAttemptContext = coreTransactionsReactive.createAttemptContext(overall,
141+
merged, txnId);
142+
return coreTransactionAttemptContext;
143+
// ReactiveTransactionAttemptContext reactiveTransactionAttemptContext = new ReactiveTransactionAttemptContext(
144+
// coreTransactionAttemptContext, null);
145+
// return reactiveTransactionAttemptContext;
146+
}
147+
148+
private static Duration now() {
149+
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
150+
}
151+
152+
public static ReactiveTransactionAttemptContext from(CoreTransactionAttemptContext coreTransactionAttemptContext,
153+
JsonSerializer serializer) {
154+
TransactionAttemptContext tac = new TransactionAttemptContext(coreTransactionAttemptContext, serializer);
155+
return reactive(tac);
156+
}
157+
158+
public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptContext atr) {
159+
CoreTransactionAttemptContext coreTransactionsReactive;
160+
try {
161+
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
162+
field.setAccessible(true);
163+
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
164+
} catch (Throwable err) {
165+
throw new RuntimeException(err);
166+
}
167+
return coreTransactionsReactive;
168+
}
169+
170+
public static CoreTransactionAttemptContext getCore(TransactionAttemptContext atr) {
171+
CoreTransactionAttemptContext coreTransactionsReactive;
172+
try {
173+
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
174+
field.setAccessible(true);
175+
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
176+
} catch (Throwable err) {
177+
throw new RuntimeException(err);
178+
}
179+
return coreTransactionsReactive;
180+
}
181+
182+
public static Mono<Void> implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) {
183+
CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
184+
try {
185+
// getDeclaredMethod() does not find it (because of primitive arg?)
186+
// CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class);
187+
Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods();
188+
Method method = null;
189+
for(Method m:methods){
190+
if( m.getName().equals("implicitCommit")){
191+
method = m;
192+
break;
193+
}
194+
}
195+
if(method == null){
196+
throw new RuntimeException("did not find implicitCommit method");
197+
}
198+
method.setAccessible(true);
199+
return (Mono<Void>)method.invoke(coreTransactionsReactive, b);
200+
} catch (Throwable err) {
201+
throw new RuntimeException(err);
202+
}
203+
204+
}
205+
206+
public static AttemptState getState(ReactiveTransactionAttemptContext atr) {
207+
CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
208+
try {
209+
Field field = CoreTransactionAttemptContext.class.getDeclaredField("state");
210+
field.setAccessible(true);
211+
return (AttemptState) field.get(coreTransactionsReactive);
212+
} catch (Throwable err) {
213+
throw new RuntimeException(err);
214+
}
215+
}
216+
217+
public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
218+
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
219+
}
220+
221+
// todo gp if needed let's expose in the SDK
222+
// static private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
223+
// StringBuilder sb = new StringBuilder();
224+
// sb.append("library version: ");
225+
// sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
226+
// sb.append(" config: ");
227+
// sb.append("atrs=");
228+
// sb.append(config.numAtrs());
229+
// sb.append(", metadataCollection=");
230+
// sb.append(config.metadataCollection());
231+
// sb.append(", expiry=");
232+
// sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
233+
// sb.append("msecs durability=");
234+
// sb.append(config.durabilityLevel());
235+
// sb.append(" per-txn config=");
236+
// sb.append(" durability=");
237+
// sb.append(perConfig.durabilityLevel());
238+
// sb.append(", supported=");
239+
// sb.append(Supported.SUPPORTED);
240+
// return sb.toString();
241+
// }
242+
243+
}

src/main/java/com/couchbase/transactions/AttemptContextReactiveAccessor.java

-93
This file was deleted.

src/main/java/com/example/demo/CouchbaseTransactionManager.java renamed to src/main/java/com/example/demo/CouchbaseTransactionManager.pre-core

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44

55
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
66
import com.couchbase.client.java.transactions.TransactionAttemptContext;
7-
import com.couchbase.client.java.transactions.TransactionResult;
8-
import com.couchbase.transactions.AttemptContextReactiveAccessor;
7+
import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor;
98
import org.slf4j.Logger;
109
import org.slf4j.LoggerFactory;
1110
import org.springframework.beans.factory.DisposableBean;
12-
import org.springframework.data.couchbase.CouchbaseClientFactory;
1311
import org.springframework.data.couchbase.core.CouchbaseTemplate;
1412
import org.springframework.data.couchbase.transaction.ClientSession;
1513
import org.springframework.data.couchbase.transaction.ClientSessionImpl;

0 commit comments

Comments
 (0)