diff --git a/.mvn/wrapper/maven-wrapper.jar b/.mvn/wrapper/maven-wrapper.jar
deleted file mode 100755
index 01e679973..000000000
Binary files a/.mvn/wrapper/maven-wrapper.jar and /dev/null differ
diff --git a/pom.xml b/pom.xml
index 6aac4a641..2a4ec9bca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,8 +18,8 @@
- 3.2.5
- 3.2.5
+ 3.3.0
+ 3.3.02.7.0-SNAPSHOTspring.data.couchbase
@@ -37,6 +37,7 @@
+
org.springframeworkspring-context-support
@@ -180,7 +181,14 @@
com.squareup.okhttp3okhttp
- 4.4.0
+ 4.8.1
+ test
+
+
+
+ com.squareup.okhttp3
+ okhttp-tls
+ 4.8.1test
@@ -212,6 +220,18 @@
4.0.3test
+
+ org.testcontainers
+ testcontainers
+
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.5
+ compile
+
+
@@ -230,10 +250,6 @@
false
-
- jitpack.io
- https://jitpack.io
-
@@ -265,6 +281,7 @@
org.apache.maven.pluginsmaven-failsafe-plugin
+ false**/*IntegrationTest.java**/*IntegrationTests.java
diff --git a/src/main/java/com/couchbase/client/java/Cluster.java b/src/main/java/com/couchbase/client/java/Cluster.java
new file mode 100644
index 000000000..23c588033
--- /dev/null
+++ b/src/main/java/com/couchbase/client/java/Cluster.java
@@ -0,0 +1,589 @@
+/*
+ * Copyright (c) 2018 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.client.java;
+
+import com.couchbase.client.core.Core;
+import com.couchbase.client.core.diagnostics.ClusterState;
+import com.couchbase.client.core.diagnostics.DiagnosticsResult;
+import com.couchbase.client.core.annotation.Stability;
+import com.couchbase.client.core.diagnostics.PingResult;
+import com.couchbase.client.core.env.Authenticator;
+import com.couchbase.client.core.env.PasswordAuthenticator;
+import com.couchbase.client.core.env.SeedNode;
+import com.couchbase.client.core.error.CouchbaseException;
+import com.couchbase.client.core.error.TimeoutException;
+import com.couchbase.client.core.msg.search.SearchRequest;
+import com.couchbase.client.java.analytics.AnalyticsOptions;
+import com.couchbase.client.java.analytics.AnalyticsResult;
+import com.couchbase.client.java.diagnostics.DiagnosticsOptions;
+import com.couchbase.client.java.diagnostics.PingOptions;
+import com.couchbase.client.java.diagnostics.WaitUntilReadyOptions;
+import com.couchbase.client.java.env.ClusterEnvironment;
+import com.couchbase.client.java.manager.analytics.AnalyticsIndexManager;
+import com.couchbase.client.java.manager.bucket.BucketManager;
+import com.couchbase.client.java.manager.eventing.EventingFunctionManager;
+import com.couchbase.client.java.manager.query.QueryIndexManager;
+import com.couchbase.client.java.manager.search.SearchIndexManager;
+import com.couchbase.client.java.manager.user.UserManager;
+import com.couchbase.client.java.query.QueryOptions;
+import com.couchbase.client.java.query.QueryResult;
+import com.couchbase.client.java.search.SearchOptions;
+import com.couchbase.client.java.search.SearchQuery;
+import com.couchbase.client.java.search.result.SearchResult;
+import com.couchbase.client.java.transactions.Transactions;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import static com.couchbase.client.core.util.Validators.notNull;
+import static com.couchbase.client.core.util.Validators.notNullOrEmpty;
+import static com.couchbase.client.java.AsyncCluster.extractClusterEnvironment;
+import static com.couchbase.client.java.AsyncCluster.seedNodesFromConnectionString;
+import static com.couchbase.client.java.AsyncUtils.block;
+import static com.couchbase.client.java.ClusterOptions.clusterOptions;
+import static com.couchbase.client.java.ReactiveCluster.DEFAULT_ANALYTICS_OPTIONS;
+import static com.couchbase.client.java.ReactiveCluster.DEFAULT_DIAGNOSTICS_OPTIONS;
+import static com.couchbase.client.java.ReactiveCluster.DEFAULT_QUERY_OPTIONS;
+import static com.couchbase.client.java.ReactiveCluster.DEFAULT_SEARCH_OPTIONS;
+
+/**
+ * The {@link Cluster} is the main entry point when connecting to a Couchbase cluster.
+ *
+ * Most likely you want to start out by using the {@link #connect(String, String, String)} entry point. For more
+ * advanced options you want to use the {@link #connect(String, ClusterOptions)} method. The entry point that allows
+ * overriding the seed nodes ({@link #connect(Set, ClusterOptions)} is only needed if you run a couchbase cluster
+ * at non-standard ports.
+ *
+ * See the individual connect methods for more information, but here is a snippet to get you off the ground quickly. It
+ * assumes you have Couchbase running locally and the "travel-sample" sample bucket loaded:
+ *
+ * //Connect and open a bucket
+ * Cluster cluster = Cluster.connect("127.0.0.1","Administrator","password");
+ * Bucket bucket = cluster.bucket("travel-sample");
+ * Collection collection = bucket.defaultCollection();
+ *
+ * // Perform a N1QL query
+ * QueryResult queryResult = cluster.query("select * from `travel-sample` limit 5");
+ * System.out.println(queryResult.rowsAsObject());
+ *
+ * // Perform a KV request and load a document
+ * GetResult getResult = collection.get("airline_10");
+ * System.out.println(getResult);
+ *
+ *
+ * When the application shuts down (or the SDK is not needed anymore), you are required to call {@link #disconnect()}.
+ * If you omit this step, the application will terminate (all spawned threads are daemon threads) but any operations
+ * or work in-flight will not be able to complete and lead to undesired side-effects. Note that disconnect will also
+ * shutdown all associated {@link Bucket buckets}.
+ *
+ * Cluster-level operations like {@link #query(String)} will not work unless at leas one bucket is opened against a
+ * pre 6.5 cluster. If you are using 6.5 or later, you can run cluster-level queries without opening a bucket. All
+ * of these operations are lazy, so the SDK will bootstrap in the background and service queries as quickly as possible.
+ * This also means that the first operations might be a bit slower until all sockets are opened in the background and
+ * the configuration is loaded. If you want to wait explicitly, you can utilize the {@link #waitUntilReady(Duration)}
+ * method before performing your first query.
+ *
+ * The SDK will only work against Couchbase Server 5.0 and later, because RBAC (role-based access control) is a first
+ * class concept since 3.0 and therefore required.
+ */
+// todo gpx as per discussion with miker - if required, ClusterInterface will be added to the SDK instead
+public class Cluster implements ClusterInterface {
+
+ /**
+ * Holds the underlying async cluster reference.
+ */
+ private final AsyncCluster asyncCluster;
+
+ /**
+ * Holds the adjacent reactive cluster reference.
+ */
+ private final ReactiveCluster reactiveCluster;
+
+ /**
+ * The search index manager manages search indexes.
+ */
+ private final SearchIndexManager searchIndexManager;
+
+ /**
+ * The user manager manages users and groups.
+ */
+ private final UserManager userManager;
+
+ /**
+ * The bucket manager manages buckets and allows to flush them.
+ */
+ private final BucketManager bucketManager;
+
+ /**
+ * Allows to manage query indexes.
+ */
+ private final QueryIndexManager queryIndexManager;
+
+ /**
+ * Allows to manage analytics indexes.
+ */
+ private final AnalyticsIndexManager analyticsIndexManager;
+
+ /**
+ * Allows to manage eventing functions.
+ */
+ private final EventingFunctionManager eventingFunctionManager;
+
+ /**
+ * Stores already opened buckets for reuse.
+ */
+ private final Map bucketCache = new ConcurrentHashMap<>();
+
+ /**
+ * Connect to a Couchbase cluster with a username and a password as credentials.
+ *
+ * This is the simplest (and recommended) method to connect to the cluster if you do not need to provide any
+ * custom options.
+ *
+ * The first argument (the connection string in its simplest form) is used to supply the hostnames of the cluster. In
+ * development it is OK to only pass in one hostname (or IP address), but in production we recommend passing in at
+ * least 3 nodes of the cluster (comma separated). The reason is that if one or more of the nodes are not reachable
+ * the client will still be able to bootstrap (and your application will become more resilient as a result).
+ *
+ * Here is how you specify one node to use for bootstrapping:
+ *
+ * Cluster cluster = Cluster.connect("127.0.0.1", "user", "password"); // ok during development
+ *
+ * This is what we recommend in production:
+ *
+ * Cluster cluster = Cluster.connect("host1,host2,host3", "user", "password"); // recommended in production
+ *
+ * It is important to understand that the SDK will only use the bootstrap ("seed nodes") host list to establish an
+ * initial contact with the cluster. Once the configuration is loaded this list is discarded and the client will
+ * connect to all nodes based on this configuration.
+ *
+ * This method will return immediately and the SDK will try to establish all the necessary resources and connections
+ * in the background. This means that depending on how fast it can be bootstrapped, the first couple cluster-level
+ * operations like {@link #query(String)} will take a bit longer. If you want to wait explicitly until those resources
+ * are available, you can use the {@link #waitUntilReady(Duration)} method before running any of them:
+ *
+ *
+ * @param connectionString connection string used to locate the Couchbase cluster.
+ * @param username the name of the user with appropriate permissions on the cluster.
+ * @param password the password of the user with appropriate permissions on the cluster.
+ * @return the instantiated {@link Cluster}.
+ */
+ public static Cluster connect(final String connectionString, final String username, final String password) {
+ return connect(connectionString, clusterOptions(PasswordAuthenticator.create(username, password)));
+ }
+
+ /**
+ * Connect to a Couchbase cluster with custom options.
+ *
+ * You likely want to use this over the simpler {@link #connect(String, String, String)} if:
+ *
+ *
A custom {@link ClusterEnvironment}
+ *
Or a custom {@link Authenticator}
+ *
+ * needs to be provided.
+ *
+ * A custom environment can be passed in like this:
+ *
+ * It is VERY important to shut down the environment when being passed in separately (as shown in
+ * the code sample above) and AFTER the cluster is disconnected. This will ensure an orderly shutdown
+ * and makes sure that no resources are left lingering.
+ *
+ * If you want to pass in a custom {@link Authenticator}, it is likely because you are setting up certificate-based
+ * authentication instead of using a username and a password directly. Remember to also enable TLS.
+ *
+ * This method will return immediately and the SDK will try to establish all the necessary resources and connections
+ * in the background. This means that depending on how fast it can be bootstrapped, the first couple cluster-level
+ * operations like {@link #query(String)} will take a bit longer. If you want to wait explicitly until those resources
+ * are available, you can use the {@link #waitUntilReady(Duration)} method before running any of them:
+ *
+ *
+ * @param connectionString connection string used to locate the Couchbase cluster.
+ * @param options custom options when creating the cluster.
+ * @return the instantiated {@link Cluster}.
+ */
+ public static Cluster connect(final String connectionString, final ClusterOptions options) {
+ notNullOrEmpty(connectionString, "ConnectionString");
+ notNull(options, "ClusterOptions");
+
+ final ClusterOptions.Built opts = options.build();
+ final Supplier environmentSupplier = extractClusterEnvironment(connectionString, opts);
+ return new Cluster(
+ environmentSupplier,
+ opts.authenticator(),
+ seedNodesFromConnectionString(connectionString, environmentSupplier.get())
+ );
+ }
+
+ /**
+ * Connect to a Couchbase cluster with a list of seed nodes and custom options.
+ *
+ * Note that you likely only want to use this method if you need to pass in custom ports for specific seed nodes
+ * during bootstrap. Otherwise we recommend relying on the simpler {@link #connect(String, String, String)} method
+ * instead.
+ *
+ * The following example shows how to bootstrap against a node with custom KV and management ports:
+ *
+ * @param seedNodes the seed nodes used to connect to the cluster.
+ * @param options custom options when creating the cluster.
+ * @return the instantiated {@link Cluster}.
+ */
+ public static Cluster connect(final Set seedNodes, final ClusterOptions options) {
+ notNullOrEmpty(seedNodes, "SeedNodes");
+ notNull(options, "ClusterOptions");
+
+ final ClusterOptions.Built opts = options.build();
+ return new Cluster(extractClusterEnvironment("", opts), opts.authenticator(), seedNodes);
+ }
+
+ /**
+ * Creates a new cluster from a {@link ClusterEnvironment}.
+ *
+ * @param environment the environment to use.
+ * @param authenticator the authenticator to use.
+ * @param seedNodes the seed nodes to bootstrap from.
+ */
+ private Cluster(final Supplier environment, final Authenticator authenticator,
+ final Set seedNodes) {
+ this.asyncCluster = new AsyncCluster(environment, authenticator, seedNodes);
+ this.reactiveCluster = new ReactiveCluster(asyncCluster);
+ this.searchIndexManager = new SearchIndexManager(asyncCluster.searchIndexes());
+ this.userManager = new UserManager(asyncCluster.users());
+ this.bucketManager = new BucketManager(asyncCluster.buckets());
+ this.queryIndexManager = new QueryIndexManager(asyncCluster.queryIndexes());
+ this.analyticsIndexManager = new AnalyticsIndexManager(this);
+ this.eventingFunctionManager = new EventingFunctionManager(asyncCluster.eventingFunctions());
+ }
+
+ /**
+ * Provides access to the related {@link AsyncCluster}.
+ *
+ * Note that the {@link AsyncCluster} is considered advanced API and should only be used to get the last drop
+ * of performance or if you are building higher-level abstractions on top. If in doubt, we recommend using the
+ * {@link #reactive()} API instead.
+ */
+ public AsyncCluster async() {
+ return asyncCluster;
+ }
+
+ /**
+ * Provides access to the related {@link ReactiveCluster}.
+ */
+ public ReactiveCluster reactive() {
+ return reactiveCluster;
+ }
+
+ /**
+ * Provides access to the underlying {@link Core}.
+ *
+ *
This is advanced and volatile API - it might change any time without notice. Use with care!
+ */
+ @Stability.Volatile
+ public Core core() {
+ return asyncCluster.core();
+ }
+
+ /**
+ * The user manager allows to manage users and groups.
+ */
+ public UserManager users() {
+ return userManager;
+ }
+
+ /**
+ * The bucket manager allows to perform administrative tasks on buckets and their resources.
+ */
+ public BucketManager buckets() {
+ return bucketManager;
+ }
+
+ /**
+ * The analytics index manager allows to modify and create indexes for the analytics service.
+ */
+ public AnalyticsIndexManager analyticsIndexes() {
+ return analyticsIndexManager;
+ }
+
+ /**
+ * The query index manager allows to modify and create indexes for the query service.
+ */
+ public QueryIndexManager queryIndexes() {
+ return queryIndexManager;
+ }
+
+ /**
+ * The search index manager allows to modify and create indexes for the search service.
+ */
+ public SearchIndexManager searchIndexes() {
+ return searchIndexManager;
+ }
+
+ /**
+ * Provides access to the eventing function management services.
+ */
+ @Stability.Uncommitted
+ public EventingFunctionManager eventingFunctions() {
+ return eventingFunctionManager;
+ }
+
+ /**
+ * Provides access to the used {@link ClusterEnvironment}.
+ */
+ public ClusterEnvironment environment() {
+ return asyncCluster.environment();
+ }
+
+ /**
+ * Performs a query against the query (N1QL) services.
+ *
+ * @param statement the N1QL query statement.
+ * @return the {@link QueryResult} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public QueryResult query(final String statement) {
+ return query(statement, DEFAULT_QUERY_OPTIONS);
+ }
+
+ /**
+ * Performs a query against the query (N1QL) services with custom options.
+ *
+ * @param statement the N1QL query statement as a raw string.
+ * @param options the custom options for this query.
+ * @return the {@link QueryResult} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public QueryResult query(final String statement, final QueryOptions options) {
+ return block(async().query(statement, options));
+ }
+
+ /**
+ * Performs an analytics query with default {@link AnalyticsOptions}.
+ *
+ * @param statement the query statement as a raw string.
+ * @return the {@link AnalyticsResult} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public AnalyticsResult analyticsQuery(final String statement) {
+ return analyticsQuery(statement, DEFAULT_ANALYTICS_OPTIONS);
+ }
+
+ /**
+ * Performs an analytics query with custom {@link AnalyticsOptions}.
+ *
+ * @param statement the query statement as a raw string.
+ * @param options the custom options for this query.
+ * @return the {@link AnalyticsResult} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public AnalyticsResult analyticsQuery(final String statement, final AnalyticsOptions options) {
+ return block(async().analyticsQuery(statement, options));
+ }
+
+ /**
+ * Performs a Full Text Search (FTS) query with default {@link SearchOptions}.
+ *
+ * @param query the query, in the form of a {@link SearchQuery}
+ * @return the {@link SearchRequest} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public SearchResult searchQuery(final String indexName, final SearchQuery query) {
+ return searchQuery(indexName, query, DEFAULT_SEARCH_OPTIONS);
+ }
+
+ /**
+ * Performs a Full Text Search (FTS) query with custom {@link SearchOptions}.
+ *
+ * @param query the query, in the form of a {@link SearchQuery}
+ * @param options the custom options for this query.
+ * @return the {@link SearchRequest} once the response arrives successfully.
+ * @throws TimeoutException if the operation times out before getting a result.
+ * @throws CouchbaseException for all other error reasons (acts as a base type and catch-all).
+ */
+ public SearchResult searchQuery(final String indexName, final SearchQuery query, final SearchOptions options) {
+ return block(asyncCluster.searchQuery(indexName, query, options));
+ }
+
+ /**
+ * Opens a {@link Bucket} with the given name.
+ *
+ * @param bucketName the name of the bucket to open.
+ * @return a {@link Bucket} once opened.
+ */
+ public Bucket bucket(final String bucketName) {
+ return bucketCache.computeIfAbsent(bucketName, n -> new Bucket(asyncCluster.bucket(n)));
+ }
+
+ /**
+ * Performs a non-reversible disconnect of this {@link Cluster}.
+ *
+ * If this method is used, the default disconnect timeout on the environment is used. Please use the companion
+ * overload ({@link #disconnect(Duration)} if you want to provide a custom duration.
+ *
+ * If a custom {@link ClusterEnvironment} has been passed in during connect, it is VERY important to
+ * shut it down after calling this method. This will prevent any in-flight tasks to be stopped prematurely.
+ */
+ public void disconnect() {
+ block(asyncCluster.disconnect());
+ }
+
+ /**
+ * Performs a non-reversible disconnect of this {@link Cluster}.
+ *
+ * If a custom {@link ClusterEnvironment} has been passed in during connect, it is VERY important to
+ * shut it down after calling this method. This will prevent any in-flight tasks to be stopped prematurely.
+ *
+ * @param timeout allows to override the default disconnect duration.
+ */
+ public void disconnect(final Duration timeout) {
+ block(asyncCluster.disconnect(timeout));
+ }
+
+ /**
+ * Runs a diagnostic report on the current state of the cluster from the SDKs point of view.
+ *
+ * Please note that it does not perform any I/O to do this, it will only use the current known state of the cluster
+ * to assemble the report (so, if for example no N1QL query has been run the socket pool might be empty and as
+ * result not show up in the report).
+ *
+ * @return the {@link DiagnosticsResult} once complete.
+ */
+ public DiagnosticsResult diagnostics() {
+ return block(asyncCluster.diagnostics(DEFAULT_DIAGNOSTICS_OPTIONS));
+ }
+
+ /**
+ * Runs a diagnostic report with custom options on the current state of the cluster from the SDKs point of view.
+ *
+ * Please note that it does not perform any I/O to do this, it will only use the current known state of the cluster
+ * to assemble the report (so, if for example no N1QL query has been run the socket pool might be empty and as
+ * result not show up in the report).
+ *
+ * @param options options that allow to customize the report.
+ * @return the {@link DiagnosticsResult} once complete.
+ */
+ public DiagnosticsResult diagnostics(final DiagnosticsOptions options) {
+ return block(asyncCluster.diagnostics(options));
+ }
+
+ /**
+ * Performs application-level ping requests against services in the couchbase cluster.
+ *
+ * Note that this operation performs active I/O against services and endpoints to assess their health. If you do
+ * not wish to perform I/O, consider using the {@link #diagnostics()} instead. You can also combine the functionality
+ * of both APIs as needed, which is {@link #waitUntilReady(Duration)} is doing in its implementation as well.
+ *
+ * @return the {@link PingResult} once complete.
+ */
+ public PingResult ping() {
+ return block(asyncCluster.ping());
+ }
+
+ /**
+ * Performs application-level ping requests with custom options against services in the couchbase cluster.
+ *
+ * Note that this operation performs active I/O against services and endpoints to assess their health. If you do
+ * not wish to perform I/O, consider using the {@link #diagnostics(DiagnosticsOptions)} instead. You can also combine
+ * the functionality of both APIs as needed, which is {@link #waitUntilReady(Duration)} is doing in its
+ * implementation as well.
+ *
+ * @return the {@link PingResult} once complete.
+ */
+ public PingResult ping(final PingOptions options) {
+ return block(asyncCluster.ping(options));
+ }
+
+ /**
+ * Waits until the desired {@link ClusterState} is reached.
+ *
+ * This method will wait until either the cluster state is "online", or the timeout is reached. Since the SDK is
+ * bootstrapping lazily, this method allows to eagerly check during bootstrap if all of the services are online
+ * and usable before moving on.
+ *
+ * @param timeout the maximum time to wait until readiness.
+ */
+ public void waitUntilReady(final Duration timeout) {
+ block(asyncCluster.waitUntilReady(timeout));
+ }
+
+ /**
+ * Waits until the desired {@link ClusterState} is reached.
+ *
+ * This method will wait until either the cluster state is "online" by default, or the timeout is reached. Since the
+ * SDK is bootstrapping lazily, this method allows to eagerly check during bootstrap if all of the services are online
+ * and usable before moving on. You can tune the properties through {@link WaitUntilReadyOptions}.
+ *
+ * @param timeout the maximum time to wait until readiness.
+ * @param options the options to customize the readiness waiting.
+ */
+ public void waitUntilReady(final Duration timeout, final WaitUntilReadyOptions options) {
+ block(asyncCluster.waitUntilReady(timeout, options));
+ }
+
+ /**
+ * Allows access to transactions.
+ *
+ * @return the {@link Transactions} interface.
+ */
+ @Stability.Uncommitted
+ public Transactions transactions() {
+ return new Transactions(core(), environment().jsonSerializer());
+ }
+}
+
diff --git a/src/main/java/com/couchbase/client/java/ClusterInterface.java b/src/main/java/com/couchbase/client/java/ClusterInterface.java
new file mode 100644
index 000000000..872a6efdf
--- /dev/null
+++ b/src/main/java/com/couchbase/client/java/ClusterInterface.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2018 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.client.java;
+
+import com.couchbase.client.core.Core;
+import com.couchbase.client.core.annotation.Stability;
+import com.couchbase.client.core.diagnostics.DiagnosticsResult;
+import com.couchbase.client.core.diagnostics.PingResult;
+import com.couchbase.client.core.env.Authenticator;
+import com.couchbase.client.core.env.PasswordAuthenticator;
+import com.couchbase.client.core.env.SeedNode;
+import com.couchbase.client.java.analytics.AnalyticsOptions;
+//import com.couchbase.client.java.analytics.AnalyticsResult;
+import com.couchbase.client.java.diagnostics.DiagnosticsOptions;
+import com.couchbase.client.java.diagnostics.PingOptions;
+import com.couchbase.client.java.diagnostics.WaitUntilReadyOptions;
+import com.couchbase.client.java.env.ClusterEnvironment;
+import com.couchbase.client.java.manager.analytics.AnalyticsIndexManager;
+import com.couchbase.client.java.manager.bucket.BucketManager;
+import com.couchbase.client.java.manager.eventing.EventingFunctionManager;
+import com.couchbase.client.java.manager.query.QueryIndexManager;
+import com.couchbase.client.java.manager.search.SearchIndexManager;
+import com.couchbase.client.java.manager.user.UserManager;
+import com.couchbase.client.java.query.QueryOptions;
+import com.couchbase.client.java.query.QueryResult;
+import com.couchbase.client.java.search.SearchOptions;
+import com.couchbase.client.java.search.SearchQuery;
+import com.couchbase.client.java.search.result.SearchResult;
+import com.couchbase.client.java.transactions.Transactions;
+import org.springframework.data.couchbase.transaction.CouchbaseTransactionalOperator;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static com.couchbase.client.core.util.Validators.notNull;
+import static com.couchbase.client.core.util.Validators.notNullOrEmpty;
+import static com.couchbase.client.java.AsyncCluster.extractClusterEnvironment;
+import static com.couchbase.client.java.AsyncCluster.seedNodesFromConnectionString;
+import static com.couchbase.client.java.ClusterOptions.clusterOptions;
+
+public interface ClusterInterface {
+
+ AsyncCluster async();
+
+ ReactiveCluster reactive();
+
+ @Stability.Volatile
+ Core core();
+
+ UserManager users();
+
+ BucketManager buckets();
+
+ AnalyticsIndexManager analyticsIndexes();
+
+ QueryIndexManager queryIndexes();
+
+ SearchIndexManager searchIndexes();
+
+ @Stability.Uncommitted
+ EventingFunctionManager eventingFunctions();
+
+ ClusterEnvironment environment();
+
+ QueryResult query(String statement);
+
+ QueryResult query(String statement, QueryOptions options);
+
+ //AnalyticsResult analyticsQuery(String statement);
+
+ // AnalyticsResult analyticsQuery(String statement, AnalyticsOptions options);
+
+ SearchResult searchQuery(String indexName, SearchQuery query);
+
+ SearchResult searchQuery(String indexName, SearchQuery query, SearchOptions options);
+
+ Bucket bucket(String bucketName);
+
+ void disconnect();
+
+ void disconnect(Duration timeout);
+
+ DiagnosticsResult diagnostics();
+
+ DiagnosticsResult diagnostics(DiagnosticsOptions options);
+
+ PingResult ping();
+
+ PingResult ping(PingOptions options);
+
+ void waitUntilReady(Duration timeout);
+
+ void waitUntilReady(Duration timeout, WaitUntilReadyOptions options);
+
+ Transactions transactions();
+}
diff --git a/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java
new file mode 100644
index 000000000..f0ffd69bd
--- /dev/null
+++ b/src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java
@@ -0,0 +1,223 @@
+/*
+/*
+ * Copyright 2021 the original author or authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.couchbase.client.java.transactions;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import com.couchbase.client.core.annotation.Stability;
+import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
+import com.couchbase.client.core.transaction.CoreTransactionContext;
+import com.couchbase.client.core.transaction.CoreTransactionsReactive;
+import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
+import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
+import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
+import com.couchbase.client.core.transaction.support.AttemptState;
+import com.couchbase.client.java.codec.JsonSerializer;
+import reactor.core.publisher.Mono;
+import reactor.util.annotation.Nullable;
+
+/**
+ * To access the ReactiveTransactionAttemptContext held by TransactionAttemptContext
+ *
+ * @author Michael Reiche
+ */
+public class AttemptContextReactiveAccessor {
+
+ public static ReactiveTransactions reactive(Transactions transactions) {
+ try {
+ Field field = Transactions.class.getDeclaredField("reactive");
+ field.setAccessible(true);
+ return (ReactiveTransactions) field.get(transactions);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ public static ReactiveTransactionAttemptContext reactive(TransactionAttemptContext atr) {
+ JsonSerializer serializer;
+ try {
+ Field field = TransactionAttemptContext.class.getDeclaredField("serializer");
+ field.setAccessible(true);
+ serializer = (JsonSerializer) field.get(atr);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ return new ReactiveTransactionAttemptContext(getCore(atr), serializer);
+ }
+
+ public static TransactionAttemptContext blocking(ReactiveTransactionAttemptContext atr) {
+ JsonSerializer serializer;
+ try {
+ Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("serializer");
+ field.setAccessible(true);
+ serializer = (JsonSerializer) field.get(atr);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ return new TransactionAttemptContext(getCore(atr), serializer);
+ }
+
+ public static CoreTransactionLogger getLogger(ReactiveTransactionAttemptContext attemptContextReactive) {
+ return attemptContextReactive.logger();
+ }
+
+ public static CoreTransactionLogger getLogger(TransactionAttemptContext attemptContextReactive) {
+ return attemptContextReactive.logger();
+ }
+
+ // todo gp needed?
+ @Stability.Internal
+ public static CoreTransactionAttemptContext newCoreTranactionAttemptContext(ReactiveTransactions transactions) {
+
+ String txnId = UUID.randomUUID().toString();
+ CoreTransactionsReactive coreTransactionsReactive;
+ try {
+ Field field = ReactiveTransactions.class.getDeclaredField("internal");
+ field.setAccessible(true);
+ coreTransactionsReactive = (CoreTransactionsReactive) field.get(transactions);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+
+ CoreTransactionOptions perConfig = new CoreTransactionOptions(Optional.empty(), Optional.empty(), Optional.empty(),
+ Optional.of(Duration.ofMinutes(10)), Optional.empty(), Optional.empty());
+
+ CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(coreTransactionsReactive.config(),
+ Optional.ofNullable(perConfig));
+ CoreTransactionContext overall = new CoreTransactionContext(
+ coreTransactionsReactive.core().context().environment().requestTracer(),
+ coreTransactionsReactive.core().context().environment().eventBus(), UUID.randomUUID().toString(), merged,
+ coreTransactionsReactive.core().transactionsCleanup());
+
+ CoreTransactionAttemptContext coreTransactionAttemptContext = coreTransactionsReactive.createAttemptContext(overall,
+ merged, txnId);
+ return coreTransactionAttemptContext;
+ }
+
+ private static Duration now() {
+ return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
+ }
+
+ public static ReactiveTransactionAttemptContext from(CoreTransactionAttemptContext coreTransactionAttemptContext,
+ JsonSerializer serializer) {
+ TransactionAttemptContext tac = new TransactionAttemptContext(coreTransactionAttemptContext, serializer);
+ return reactive(tac);
+ }
+
+ public static CoreTransactionAttemptContext getCore(ReactiveTransactionAttemptContext atr) {
+ CoreTransactionAttemptContext coreTransactionsReactive;
+ try {
+ Field field = ReactiveTransactionAttemptContext.class.getDeclaredField("internal");
+ field.setAccessible(true);
+ coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ return coreTransactionsReactive;
+ }
+
+ public static CoreTransactionAttemptContext getCore(TransactionAttemptContext atr) {
+ try {
+ Field field = TransactionAttemptContext.class.getDeclaredField("internal");
+ field.setAccessible(true);
+ return (CoreTransactionAttemptContext) field.get(atr);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ public static Mono implicitCommit(ReactiveTransactionAttemptContext atr, boolean b) {
+ CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
+ try {
+ // getDeclaredMethod() does not find it (because of primitive arg?)
+ // CoreTransactionAttemptContext.class.getDeclaredMethod("implicitCommit", Boolean.class);
+ Method[] methods = CoreTransactionAttemptContext.class.getDeclaredMethods();
+ Method method = null;
+ for (Method m : methods) {
+ if (m.getName().equals("implicitCommit")) {
+ method = m;
+ break;
+ }
+ }
+ if (method == null) {
+ throw new RuntimeException("did not find implicitCommit method");
+ }
+ method.setAccessible(true);
+ return (Mono) method.invoke(coreTransactionsReactive, b);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+
+ }
+
+ public static AttemptState getState(ReactiveTransactionAttemptContext atr) {
+ CoreTransactionAttemptContext coreTransactionsReactive = getCore(atr);
+ try {
+ Field field = CoreTransactionAttemptContext.class.getDeclaredField("state");
+ field.setAccessible(true);
+ return (AttemptState) field.get(coreTransactionsReactive);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(
+ CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
+ return new ReactiveTransactionAttemptContext(core, jsonSerializer);
+ }
+
+ public static CoreTransactionsReactive getCoreTransactionsReactive(ReactiveTransactions transactions) {
+ try {
+ Field field = ReactiveTransactions.class.getDeclaredField("internal");
+ field.setAccessible(true);
+ return (CoreTransactionsReactive) field.get(transactions);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ }
+
+ public static TransactionAttemptContext newTransactionAttemptContext(CoreTransactionAttemptContext ctx,
+ JsonSerializer jsonSerializer) {
+ return new TransactionAttemptContext(ctx, jsonSerializer);
+ }
+
+ public static TransactionResult run(Transactions transactions, Consumer transactionLogic, CoreTransactionOptions coreTransactionOptions) {
+ return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
+ }
+
+ CoreTransactionAttemptContext coreTransactionsReactive;
+ try {
+ Field field = TransactionAttemptContext.class.getDeclaredField("internal");
+ field.setAccessible(true);
+ coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
+ } catch (Throwable err) {
+ throw new RuntimeException(err);
+ }
+ return coreTransactionsReactive;
+ }
+
+ public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
+ return new ReactiveTransactionAttemptContext(core, jsonSerializer);
+ }
+}
diff --git a/src/main/java/com/couchbase/transactions/TransactionsReactive.java b/src/main/java/com/couchbase/transactions/TransactionsReactive.java
new file mode 100644
index 000000000..352135ead
--- /dev/null
+++ b/src/main/java/com/couchbase/transactions/TransactionsReactive.java
@@ -0,0 +1,753 @@
+///*
+// * Copyright 2021 Couchbase, Inc.
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package com.couchbase.transactions;
+//
+//import com.couchbase.client.core.annotation.Stability;
+//import com.couchbase.client.core.cnc.EventBus;
+//import com.couchbase.client.core.retry.reactor.DefaultRetry;
+//import com.couchbase.client.core.retry.reactor.Jitter;
+//import com.couchbase.client.core.retry.reactor.RetryContext;
+//import com.couchbase.client.java.Cluster;
+//import com.couchbase.client.java.ReactiveCollection;
+//import com.couchbase.client.java.ReactiveScope;
+//import com.couchbase.client.java.json.JsonObject;
+//import com.couchbase.client.java.query.ReactiveQueryResult;
+//import com.couchbase.transactions.cleanup.ClusterData;
+//import com.couchbase.transactions.cleanup.TransactionsCleanup;
+//import com.couchbase.transactions.components.ATR;
+//import com.couchbase.transactions.components.ActiveTransactionRecord;
+//import com.couchbase.transactions.config.MergedTransactionConfig;
+//import com.couchbase.transactions.config.PerTransactionConfig;
+//import com.couchbase.transactions.config.PerTransactionConfigBuilder;
+//import com.couchbase.transactions.config.SingleQueryTransactionConfig;
+//import com.couchbase.transactions.config.SingleQueryTransactionConfigBuilder;
+//import com.couchbase.transactions.config.TransactionConfig;
+//import com.couchbase.transactions.deferred.TransactionSerializedContext;
+//import com.couchbase.transactions.error.TransactionCommitAmbiguous;
+//import com.couchbase.transactions.error.TransactionExpired;
+//import com.couchbase.transactions.error.TransactionFailedException;
+//import com.couchbase.transactions.error.internal.ErrorClasses;
+//import com.couchbase.transactions.error.external.TransactionOperationFailed;
+//import com.couchbase.transactions.forwards.Supported;
+//import com.couchbase.transactions.log.EventBusPersistedLogger;
+//import com.couchbase.transactions.log.PersistedLogWriter;
+//import com.couchbase.transactions.log.TransactionLogEvent;
+//import com.couchbase.transactions.support.AttemptContextFactory;
+//import com.couchbase.transactions.support.AttemptState;
+//import com.couchbase.transactions.support.OptionsWrapperUtil;
+//import com.couchbase.transactions.util.DebugUtil;
+//import reactor.core.publisher.Mono;
+//import reactor.core.scheduler.Schedulers;
+//
+//import java.time.Duration;
+//import java.time.temporal.ChronoUnit;
+//import java.util.Objects;
+//import java.util.Optional;
+//import java.util.UUID;
+//import java.util.concurrent.TimeUnit;
+//import java.util.concurrent.atomic.AtomicReference;
+//import java.util.function.Consumer;
+//import java.util.function.Function;
+//import java.util.function.Predicate;
+//
+//import static com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder.createError;
+//import static com.couchbase.transactions.log.PersistedLogWriter.MAX_LOG_ENTRIES_DEFAULT;
+//import static com.couchbase.transactions.support.SpanWrapperUtil.DB_COUCHBASE_TRANSACTIONS;
+//
+///**
+// * An asynchronous version of {@link Transactions}, allowing transactions to be created and run in an asynchronous
+// * manner.
+// *
+// * The main method to run transactions is {@link TransactionsReactive#run}.
+// */
+//public class TransactionsReactive {
+// static final int MAX_ATTEMPTS = 1000;
+// private final TransactionsCleanup cleanup;
+// private final TransactionConfig config;
+// private AttemptContextFactory attemptContextFactory;
+// private EventBusPersistedLogger persistedLogger;
+//
+// /**
+// * This is package-private. Applications should create a {@link Transactions} object instead, and then call {@link
+// * Transactions#reactive}.
+// */
+// static TransactionsReactive create(Cluster cluster, TransactionConfig config) {
+// return new TransactionsReactive(cluster, config);
+// }
+//
+// private TransactionsReactive(Cluster cluster, TransactionConfig config) {
+// Objects.requireNonNull(cluster);
+// Objects.requireNonNull(config);
+//
+// ClusterData clusterData = new ClusterData(cluster);
+// this.config = config;
+// this.attemptContextFactory = config.attemptContextFactory();
+// MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.empty());
+// cleanup = new TransactionsCleanup(merged, clusterData);
+//
+// config.persistentLoggingCollection().ifPresent(collection -> {
+// PersistedLogWriter persistedLogWriter = new PersistedLogWriter(collection, MAX_LOG_ENTRIES_DEFAULT);
+// persistedLogger = new EventBusPersistedLogger(cluster.environment().eventBus(), persistedLogWriter, merged);
+// });
+// }
+//
+//
+// /**
+// * The main transactions 'engine', responsible for attempting the transaction logic as many times as required,
+// * until the transaction commits, is explicitly rolled back, or expires.
+// */
+// // TODO: changed from private to public. package-protected plus an accessor would be ok to
+// public Mono executeTransaction(MergedTransactionConfig config,
+// TransactionContext overall,
+// Mono transactionLogic) {
+// AtomicReference startTime = new AtomicReference<>();
+//
+// return Mono.just(overall)
+//
+// .subscribeOn(reactor.core.scheduler.Schedulers.elastic())
+//
+// .doOnSubscribe(v -> {
+// if (startTime.get() == null) startTime.set(System.nanoTime());
+// })
+//
+// // Where the magic happens: execute the app's transaction logic
+// // A ReactiveTransactionAttemptContext gets created in here. Rollback requires one of these (so it knows what
+// // to rollback), so only errors thrown inside this block can trigger rollback.
+// // So, expiry checks only get done inside this block.
+// .then(transactionLogic)
+//
+// .flatMap(this::executeImplicitCommit)
+//
+// // Track an attempt if non-error, and request that the attempt be cleaned up. Similar logic is also
+// // done in executeHandleErrorsPreRetry.
+// .doOnNext(ctx -> executeAddAttemptAndCleanupRequest(config, overall, ctx))
+//
+// // Track an attempt if error, and perform rollback if needed.
+// // All errors reaching here must be a `TransactionOperationFailed`.
+// .onErrorResume(err -> executeHandleErrorsPreRetry(config, overall, err))
+//
+// // This is the main place to retry txns. Feed all errors up to this centralised point.
+// // All errors reaching here must be a `TransactionOperationFailed`.
+// .retryWhen(executeCreateRetryWhen(overall))
+//
+// // If we're here, then we've hit an error that we don't want to retry.
+// // Either raise some derivative of TransactionFailedException to the app, or return an ReactiveTransactionAttemptContext
+// // to return success (some errors result in success, e.g. TRANSACTION_FAILED_POST_COMMIT)
+// // All errors reaching here must be an `ErrorWrapper`.
+// .onErrorResume(err -> executeHandleErrorsPostRetry(overall, err))
+//
+// .doOnError(err -> {
+// if (config.logOnFailure() && !config.logDirectly()) {
+// EventBus eventBus = cleanup.clusterData().cluster().environment().eventBus();
+// overall.LOGGER.logs().forEach(log -> {
+// eventBus.publish(new TransactionLogEvent(config.logOnFailureLevel(),
+// TransactionLogEvent.DEFAULT_CATEGORY, log.toString()));
+// });
+// }
+// })
+//
+// // If we get here, success
+// .doOnSuccess(v ->
+// overall.LOGGER.info("finished txn in %dus",
+// TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime.get()))
+// )
+//
+// // Safe to do single() as there will only ever be 1 result
+// .single()
+// .map(v -> createResultFromContext(overall));
+// }
+//
+// private reactor.util.retry.Retry executeCreateRetryWhen(TransactionContext overall) {
+// Predicate super RetryContext