Skip to content

Datacouch 1145 transaction support #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ public static ReactiveTransactionAttemptContext reactive(TransactionAttemptConte
return new ReactiveTransactionAttemptContext(getCore(atr), serializer);
}

public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) {
JsonSerializer serializer;
try {
Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("serializer");
field.setAccessible(true);
serializer = (JsonSerializer) field.get(atr);
} catch (Throwable err) {
throw new RuntimeException(err);
}
return new TransactionAttemptContext(getCore(atr), serializer);
}

public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) {
return attemptContextReactive.logger();
}

public static CoreTransactionLogger getLogger(TransactionAttemptContext attemptContextReactive) {
return attemptContextReactive.logger();
}
Expand All @@ -100,6 +84,7 @@ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(Reac
throw new RuntimeException(err);
}

// todo gpx options need to be loaded from Cluster and TransactionOptions (from TransactionsWrapper)
CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(), Optional.empty(), Optional.empty(),
Optional.of(Duration.ofMinutes(10)), Optional.empty(), Optional.empty());

Expand All @@ -115,16 +100,6 @@ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(Reac
return coreTransactionAttemptContext;
}

private static Duration now() {
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
}

public static ReactiveTransactionAttemptContext from(CoreTransactionAttemptContext coreTransactionAttemptContext,
JsonSerializer serializer) {
TransactionAttemptContext tac = new TransactionAttemptContext(coreTransactionAttemptContext, serializer);
return reactive(tac);
}

public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptContext atr) {
CoreTransactionAttemptContext coreTransactionsReactive;
try {
Expand All @@ -147,61 +122,11 @@ public static CoreTransactionAttemptContext getCore(TransactionAttemptContext at
}
}

public static Mono<Void> implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) {
CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
try {
// getDeclaredMethod() does not find it (because of primitive arg?)
// CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class);
Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods();
Method method = null;
for (Method m : methods) {
if (m.getName().equals("implicitCommit")) {
method = m;
break;
}
}
if (method == null) {
throw new RuntimeException("did not find implicitCommit method");
}
method.setAccessible(true);
return (Mono<Void>) method.invoke(coreTransactionsReactive, b);
} catch (Throwable err) {
throw new RuntimeException(err);
}

}

public static AttemptState getState(ReactiveTransactionAttemptContext atr) {
CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
try {
Field field = CoreTransactionAttemptContext.class.getDeclaredField("state");
field.setAccessible(true);
return (AttemptState) field.get(coreTransactionsReactive);
} catch (Throwable err) {
throw new RuntimeException(err);
}
}

public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(
CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
}

public static CoreTransactionsReactive getCoreTransactionsReactive(ReactiveTransactions transactions) {
try {
Field field = ReactiveTransactions.class.getDeclaredField("internal");
field.setAccessible(true);
return (CoreTransactionsReactive) field.get(transactions);
} catch (Throwable err) {
throw new RuntimeException(err);
}
}

public static TransactionAttemptContext newTransactionAttemptContext(CoreTransactionAttemptContext ctx,
JsonSerializer jsonSerializer) {
return new TransactionAttemptContext(ctx, jsonSerializer);
}

public static TransactionResult run(Transactions transactions, Consumer<TransactionAttemptContext> transactionLogic, CoreTransactionOptions coreTransactionOptions) {
return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
}
Expand Down
Loading