From 9264cfcd3854982c3a30ae3af13f7f04411a8b04 Mon Sep 17 00:00:00 2001 From: Oleg Kuznetsov Date: Mon, 10 Jan 2022 18:19:21 +0300 Subject: [PATCH 1/2] Add method for reconnection --- .../tarantool/driver/api/TarantoolClient.java | 12 +++ .../driver/core/AbstractTarantoolClient.java | 5 ++ .../driver/core/ProxyTarantoolClient.java | 5 ++ .../driver/core/RetryingTarantoolClient.java | 5 ++ .../AbstractTarantoolConnectionManager.java | 5 ++ .../TarantoolConnectionManager.java | 7 ++ .../integration/ClusterDiscoveryIT.java | 4 +- .../driver/integration/ReconnectIT.java | 86 +++++++++++++++++++ src/test/resources/cartridge/instances.yml | 5 ++ src/test/resources/cartridge/topology.lua | 40 +++++---- 10 files changed, 154 insertions(+), 20 deletions(-) create mode 100644 src/test/java/io/tarantool/driver/integration/ReconnectIT.java diff --git a/src/main/java/io/tarantool/driver/api/TarantoolClient.java b/src/main/java/io/tarantool/driver/api/TarantoolClient.java index c1f41ee97..7e2833bac 100644 --- a/src/main/java/io/tarantool/driver/api/TarantoolClient.java +++ b/src/main/java/io/tarantool/driver/api/TarantoolClient.java @@ -26,12 +26,14 @@ public interface TarantoolClient> /** * Get the Tarantool client config passed to this client + * * @return {@link TarantoolClientConfig} instance */ TarantoolClientConfig getConfig(); /** * Get the Tarantool server version + * * @return {@link TarantoolVersion} * @throws TarantoolClientException if the client is not connected */ @@ -39,6 +41,7 @@ public interface TarantoolClient> /** * Provides CRUD and other operations for a Tarantool space + * * @param spaceName name of the space, must not be null or empty * @return Tarantool space operations interface * @throws TarantoolClientException if the client is not connected @@ -47,6 +50,7 @@ public interface TarantoolClient> /** * Provides CRUD and other operations for a Tarantool space + * * @param spaceId ID of the space, must be greater than 0 * @return Tarantool space operations implementation * @throws TarantoolClientException if the client is not connected @@ -55,6 +59,7 @@ public interface TarantoolClient> /** * Provides operations for Tarantool spaces and indexes metadata + * * @return Tarantool metadata operations implementation * @throws TarantoolClientException if the client is not connected */ @@ -67,4 +72,11 @@ public interface TarantoolClient> * @return connection listeners */ TarantoolConnectionListeners getConnectionListeners(); + + /** + * Starts the process of establishing lacking connections to each host + * + * @return returns true if the establishing process has been started, else false + */ + boolean establishLackingConnections(); } diff --git a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java index c310b1a66..f94990235 100644 --- a/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java +++ b/src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java @@ -148,6 +148,11 @@ private TarantoolConnectionManager connectionManager() { return this.connectionManager; } + @Override + public boolean establishLackingConnections() { + return connectionManager.establishLackingConnections(); + } + @Override public TarantoolVersion getVersion() throws TarantoolClientException { try { diff --git a/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java b/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java index f1f709587..67d53efe0 100644 --- a/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java +++ b/src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java @@ -416,6 +416,11 @@ public CompletableFuture> eval(String expression, return client.eval(expression, arguments, argumentsMapper, resultMapper); } + @Override + public boolean establishLackingConnections() { + return this.client.establishLackingConnections(); + } + @Override public void close() throws Exception { this.client.close(); diff --git a/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java b/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java index ecdcef651..5c50c51d0 100644 --- a/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java +++ b/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java @@ -378,6 +378,11 @@ public CompletableFuture> eval(String expression, List arguments, return wrapOperation(() -> client.eval(expression, arguments, argumentsMapper, resultMapper)); } + @Override + public boolean establishLackingConnections() { + return this.client.establishLackingConnections(); + } + @Override public void close() throws Exception { client.close(); diff --git a/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java b/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java index 6f4214e2c..b6bb7f479 100644 --- a/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java +++ b/src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java @@ -104,6 +104,11 @@ public CompletableFuture getConnection() { }); } + @Override + public boolean establishLackingConnections() { + return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL); + } + private CompletableFuture getConnectionInternal() { CompletableFuture result; ConnectionMode currentMode = connectionMode.get(); diff --git a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java index 4a1f4c63c..e64968242 100644 --- a/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java +++ b/src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java @@ -18,4 +18,11 @@ public interface TarantoolConnectionManager extends AutoCloseable { * @return a future with next connection in order */ CompletableFuture getConnection(); + + /** + * Starts the process of establishing lacking connections to each host + * + * @return returns true if the establishing process has been started, else false + */ + boolean establishLackingConnections(); } diff --git a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java index adf651f8e..776bd91aa 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java @@ -42,7 +42,7 @@ public void httpClusterDiscovererTest() throws TarantoolClientException { HTTPDiscoveryClusterAddressProvider addressProvider = getHttpProvider(); Collection nodes = addressProvider.getAddresses(); - assertEquals(nodes.size(), 2); + assertEquals(nodes.size(), 3); Set nodeSet = new HashSet<>(nodes); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI))); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI))); @@ -68,7 +68,7 @@ public void binaryClusterDiscovererTest() { TarantoolClusterAddressProvider addressProvider = getBinaryProvider(); Collection nodes = addressProvider.getAddresses(); - assertEquals(nodes.size(), 2); + assertEquals(nodes.size(), 3); Set nodeSet = new HashSet<>(nodes); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI))); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI))); diff --git a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java new file mode 100644 index 000000000..ceb009df6 --- /dev/null +++ b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java @@ -0,0 +1,86 @@ +package io.tarantool.driver.integration; + +import io.tarantool.driver.api.TarantoolClient; +import io.tarantool.driver.api.TarantoolClientFactory; +import io.tarantool.driver.api.TarantoolResult; +import io.tarantool.driver.api.TarantoolServerAddress; +import io.tarantool.driver.api.tuple.TarantoolTuple; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.output.WaitingConsumer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +public class ReconnectIT extends SharedCartridgeContainer { + + public static String USER_NAME; + public static String PASSWORD; + + @BeforeAll + public static void setUp() throws TimeoutException { + startCluster(); + + WaitingConsumer waitingConsumer = new WaitingConsumer(); + container.followOutput(waitingConsumer); + waitingConsumer.waitUntil(f -> f.getUtf8String().contains("The cluster is balanced ok")); + + USER_NAME = container.getUsername(); + PASSWORD = container.getPassword(); + } + + @Test + public void test_should_reconnect_ifReconnectIsInvoked() throws Exception { + //when + TarantoolClient> client = + TarantoolClientFactory.createClient() + .withAddresses( + new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3301)), + new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3311)), + new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3312)) + ) + .withCredentials(USER_NAME, PASSWORD) + .build(); + + final Set routerUuids = getInstancesUuids(client); + + // stop routers + container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "router"); + container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "second-router"); + + assertEquals(client.eval("return box.info().uuid").join().get(0), + client.eval("return box.info().uuid").join().get(0)); + + // start routers + container.execInContainer("cartridge", "start", "--run-dir=/tmp/run", "--data-dir=/tmp/data", "-d"); + + client.establishLackingConnections(); + Thread.sleep(3000); + + final Set uuidsAfterReconnect = getInstancesUuids(client); + + assertEquals(routerUuids.size(), uuidsAfterReconnect.size()); + } + + private Set getInstancesUuids(TarantoolClient> client) { + final Set routerUuids = new HashSet<>(); + String firstUuid = getInstanceUuid(client); + routerUuids.add(firstUuid); + + String currentUuid = ""; + while (!firstUuid.equals(currentUuid)) { + currentUuid = getInstanceUuid(client); + routerUuids.add(currentUuid); + } + return routerUuids; + } + + private String getInstanceUuid(TarantoolClient> client) { + return (String) client.eval("return box.info().uuid").join().get(0); + } +} diff --git a/src/test/resources/cartridge/instances.yml b/src/test/resources/cartridge/instances.yml index 34c80834c..5d2f8aeec 100644 --- a/src/test/resources/cartridge/instances.yml +++ b/src/test/resources/cartridge/instances.yml @@ -8,6 +8,11 @@ testapp.second-router: advertise_uri: localhost:3311 http_port: 8091 +testapp.third-router: + workdir: ./tmp/db_dev/3312 + advertise_uri: localhost:3312 + http_port: 8092 + testapp.s1-master: workdir: ./tmp/db_dev/3302 advertise_uri: localhost:3302 diff --git a/src/test/resources/cartridge/topology.lua b/src/test/resources/cartridge/topology.lua index cd1003252..9f7889e99 100644 --- a/src/test/resources/cartridge/topology.lua +++ b/src/test/resources/cartridge/topology.lua @@ -1,19 +1,23 @@ cartridge = require('cartridge') -replicasets = {{ - alias = 'app-router', - roles = {'vshard-router', 'app.roles.custom', 'app.roles.api_router'}, - join_servers = {{uri = 'localhost:3301'}} -}, { - alias = 'app-router-second', - roles = {'vshard-router', 'app.roles.custom', 'app.roles.api_router'}, - join_servers = {{uri = 'localhost:3311'}} -}, { - alias = 's1-storage', - roles = {'vshard-storage', 'app.roles.api_storage'}, - join_servers = {{uri = 'localhost:3302'}, {uri = 'localhost:3303'}} -}, { - alias = 's2-storage', - roles = {'vshard-storage', 'app.roles.api_storage'}, - join_servers = {{uri = 'localhost:3304'}, {uri = 'localhost:3305'}} -}} -return cartridge.admin_edit_topology({replicasets = replicasets}) +replicasets = { { + alias = 'app-router', + roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, + join_servers = { { uri = 'localhost:3301' } } + }, { + alias = 'app-router-second', + roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, + join_servers = { { uri = 'localhost:3311' } } + }, { + alias = 'app-router-third', + roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, + join_servers = { { uri = 'localhost:3312' } } + }, { + alias = 's1-storage', + roles = { 'vshard-storage', 'app.roles.api_storage' }, + join_servers = { { uri = 'localhost:3302' }, { uri = 'localhost:3303' } } + }, { + alias = 's2-storage', + roles = { 'vshard-storage', 'app.roles.api_storage' }, + join_servers = { { uri = 'localhost:3304' }, { uri = 'localhost:3305' } } + } } +return cartridge.admin_edit_topology({ replicasets = replicasets }) From 4459c1ce672475d69fe13163f85a03f09b4f644b Mon Sep 17 00:00:00 2001 From: Oleg Kuznetsov Date: Wed, 12 Jan 2022 15:50:38 +0300 Subject: [PATCH 2/2] Added comments for tests --- .../driver/integration/ReconnectIT.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java index ceb009df6..2dd38bb0b 100644 --- a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java +++ b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java @@ -19,8 +19,8 @@ @Testcontainers public class ReconnectIT extends SharedCartridgeContainer { - public static String USER_NAME; - public static String PASSWORD; + private static String USER_NAME; + private static String PASSWORD; @BeforeAll public static void setUp() throws TimeoutException { @@ -45,16 +45,18 @@ public void test_should_reconnect_ifReconnectIsInvoked() throws Exception { new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3312)) ) .withCredentials(USER_NAME, PASSWORD) + .withConnections(10) .build(); + // getting all routers uuids final Set routerUuids = getInstancesUuids(client); // stop routers container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "router"); container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "second-router"); - assertEquals(client.eval("return box.info().uuid").join().get(0), - client.eval("return box.info().uuid").join().get(0)); + // check that there is only one instance left + assertEquals(getInstanceUuid(client), getInstanceUuid(client)); // start routers container.execInContainer("cartridge", "start", "--run-dir=/tmp/run", "--data-dir=/tmp/data", "-d"); @@ -62,14 +64,23 @@ public void test_should_reconnect_ifReconnectIsInvoked() throws Exception { client.establishLackingConnections(); Thread.sleep(3000); + // getting all routers uuids after restarting final Set uuidsAfterReconnect = getInstancesUuids(client); + // check that amount of routers is equal initial amount assertEquals(routerUuids.size(), uuidsAfterReconnect.size()); } + /** + * Return all instances uuids from cluster, using round robin connection selection strategy + * + * @param client Tarantool client + * @return set of instances uuids from cluster + */ private Set getInstancesUuids(TarantoolClient> client) { - final Set routerUuids = new HashSet<>(); String firstUuid = getInstanceUuid(client); + + final Set routerUuids = new HashSet<>(); routerUuids.add(firstUuid); String currentUuid = ""; @@ -77,6 +88,7 @@ private Set getInstancesUuids(TarantoolClient