Skip to content

Commit 88e57b7

Browse files
authored
Add method for reconnection (#160)
* Add method for reconnection * Added comments for tests
1 parent 79c36ee commit 88e57b7

File tree

10 files changed

+166
-20
lines changed

10 files changed

+166
-20
lines changed

src/main/java/io/tarantool/driver/api/TarantoolClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,22 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
2626

2727
/**
2828
* Get the Tarantool client config passed to this client
29+
*
2930
* @return {@link TarantoolClientConfig} instance
3031
*/
3132
TarantoolClientConfig getConfig();
3233

3334
/**
3435
* Get the Tarantool server version
36+
*
3537
* @return {@link TarantoolVersion}
3638
* @throws TarantoolClientException if the client is not connected
3739
*/
3840
TarantoolVersion getVersion() throws TarantoolClientException;
3941

4042
/**
4143
* Provides CRUD and other operations for a Tarantool space
44+
*
4245
* @param spaceName name of the space, must not be null or empty
4346
* @return Tarantool space operations interface
4447
* @throws TarantoolClientException if the client is not connected
@@ -47,6 +50,7 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
4750

4851
/**
4952
* Provides CRUD and other operations for a Tarantool space
53+
*
5054
* @param spaceId ID of the space, must be greater than 0
5155
* @return Tarantool space operations implementation
5256
* @throws TarantoolClientException if the client is not connected
@@ -55,6 +59,7 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
5559

5660
/**
5761
* Provides operations for Tarantool spaces and indexes metadata
62+
*
5863
* @return Tarantool metadata operations implementation
5964
* @throws TarantoolClientException if the client is not connected
6065
*/
@@ -67,4 +72,11 @@ public interface TarantoolClient<T extends Packable, R extends Collection<T>>
6772
* @return connection listeners
6873
*/
6974
TarantoolConnectionListeners getConnectionListeners();
75+
76+
/**
77+
* Starts the process of establishing lacking connections to each host
78+
*
79+
* @return returns true if the establishing process has been started, else false
80+
*/
81+
boolean establishLackingConnections();
7082
}

src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ private TarantoolConnectionManager connectionManager() {
148148
return this.connectionManager;
149149
}
150150

151+
@Override
152+
public boolean establishLackingConnections() {
153+
return connectionManager.establishLackingConnections();
154+
}
155+
151156
@Override
152157
public TarantoolVersion getVersion() throws TarantoolClientException {
153158
try {

src/main/java/io/tarantool/driver/core/ProxyTarantoolClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,11 @@ public CompletableFuture<List<?>> eval(String expression,
416416
return client.eval(expression, arguments, argumentsMapper, resultMapper);
417417
}
418418

419+
@Override
420+
public boolean establishLackingConnections() {
421+
return this.client.establishLackingConnections();
422+
}
423+
419424
@Override
420425
public void close() throws Exception {
421426
this.client.close();

src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,11 @@ public CompletableFuture<List<?>> eval(String expression, List<?> arguments,
378378
return wrapOperation(() -> client.eval(expression, arguments, argumentsMapper, resultMapper));
379379
}
380380

381+
@Override
382+
public boolean establishLackingConnections() {
383+
return this.client.establishLackingConnections();
384+
}
385+
381386
@Override
382387
public void close() throws Exception {
383388
client.close();

src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public CompletableFuture<TarantoolConnection> getConnection() {
104104
});
105105
}
106106

107+
@Override
108+
public boolean establishLackingConnections() {
109+
return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL);
110+
}
111+
107112
private CompletableFuture<TarantoolConnection> getConnectionInternal() {
108113
CompletableFuture<TarantoolConnection> result;
109114
ConnectionMode currentMode = connectionMode.get();

src/main/java/io/tarantool/driver/core/connection/TarantoolConnectionManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,11 @@ public interface TarantoolConnectionManager extends AutoCloseable {
1818
* @return a future with next connection in order
1919
*/
2020
CompletableFuture<TarantoolConnection> getConnection();
21+
22+
/**
23+
* Starts the process of establishing lacking connections to each host
24+
*
25+
* @return returns true if the establishing process has been started, else false
26+
*/
27+
boolean establishLackingConnections();
2128
}

src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void httpClusterDiscovererTest() throws TarantoolClientException {
4242
HTTPDiscoveryClusterAddressProvider addressProvider = getHttpProvider();
4343
Collection<TarantoolServerAddress> nodes = addressProvider.getAddresses();
4444

45-
assertEquals(nodes.size(), 2);
45+
assertEquals(nodes.size(), 3);
4646
Set<TarantoolServerAddress> nodeSet = new HashSet<>(nodes);
4747
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI)));
4848
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI)));
@@ -68,7 +68,7 @@ public void binaryClusterDiscovererTest() {
6868
TarantoolClusterAddressProvider addressProvider = getBinaryProvider();
6969

7070
Collection<TarantoolServerAddress> nodes = addressProvider.getAddresses();
71-
assertEquals(nodes.size(), 2);
71+
assertEquals(nodes.size(), 3);
7272
Set<TarantoolServerAddress> nodeSet = new HashSet<>(nodes);
7373
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI)));
7474
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI)));
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.tarantool.driver.integration;
2+
3+
import io.tarantool.driver.api.TarantoolClient;
4+
import io.tarantool.driver.api.TarantoolClientFactory;
5+
import io.tarantool.driver.api.TarantoolResult;
6+
import io.tarantool.driver.api.TarantoolServerAddress;
7+
import io.tarantool.driver.api.tuple.TarantoolTuple;
8+
import org.junit.jupiter.api.BeforeAll;
9+
import org.junit.jupiter.api.Test;
10+
import org.testcontainers.containers.output.WaitingConsumer;
11+
import org.testcontainers.junit.jupiter.Testcontainers;
12+
13+
import java.util.HashSet;
14+
import java.util.Set;
15+
import java.util.concurrent.TimeoutException;
16+
17+
import static org.junit.jupiter.api.Assertions.assertEquals;
18+
19+
@Testcontainers
20+
public class ReconnectIT extends SharedCartridgeContainer {
21+
22+
private static String USER_NAME;
23+
private static String PASSWORD;
24+
25+
@BeforeAll
26+
public static void setUp() throws TimeoutException {
27+
startCluster();
28+
29+
WaitingConsumer waitingConsumer = new WaitingConsumer();
30+
container.followOutput(waitingConsumer);
31+
waitingConsumer.waitUntil(f -> f.getUtf8String().contains("The cluster is balanced ok"));
32+
33+
USER_NAME = container.getUsername();
34+
PASSWORD = container.getPassword();
35+
}
36+
37+
@Test
38+
public void test_should_reconnect_ifReconnectIsInvoked() throws Exception {
39+
//when
40+
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> client =
41+
TarantoolClientFactory.createClient()
42+
.withAddresses(
43+
new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3301)),
44+
new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3311)),
45+
new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3312))
46+
)
47+
.withCredentials(USER_NAME, PASSWORD)
48+
.withConnections(10)
49+
.build();
50+
51+
// getting all routers uuids
52+
final Set<String> routerUuids = getInstancesUuids(client);
53+
54+
// stop routers
55+
container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "router");
56+
container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "second-router");
57+
58+
// check that there is only one instance left
59+
assertEquals(getInstanceUuid(client), getInstanceUuid(client));
60+
61+
// start routers
62+
container.execInContainer("cartridge", "start", "--run-dir=/tmp/run", "--data-dir=/tmp/data", "-d");
63+
64+
client.establishLackingConnections();
65+
Thread.sleep(3000);
66+
67+
// getting all routers uuids after restarting
68+
final Set<String> uuidsAfterReconnect = getInstancesUuids(client);
69+
70+
// check that amount of routers is equal initial amount
71+
assertEquals(routerUuids.size(), uuidsAfterReconnect.size());
72+
}
73+
74+
/**
75+
* Return all instances uuids from cluster, using round robin connection selection strategy
76+
*
77+
* @param client Tarantool client
78+
* @return set of instances uuids from cluster
79+
*/
80+
private Set<String> getInstancesUuids(TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> client) {
81+
String firstUuid = getInstanceUuid(client);
82+
83+
final Set<String> routerUuids = new HashSet<>();
84+
routerUuids.add(firstUuid);
85+
86+
String currentUuid = "";
87+
while (!firstUuid.equals(currentUuid)) {
88+
currentUuid = getInstanceUuid(client);
89+
routerUuids.add(currentUuid);
90+
}
91+
92+
return routerUuids;
93+
}
94+
95+
private String getInstanceUuid(TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> client) {
96+
return (String) client.eval("return box.info().uuid").join().get(0);
97+
}
98+
}

src/test/resources/cartridge/instances.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ testapp.second-router:
88
advertise_uri: localhost:3311
99
http_port: 8091
1010

11+
testapp.third-router:
12+
workdir: ./tmp/db_dev/3312
13+
advertise_uri: localhost:3312
14+
http_port: 8092
15+
1116
testapp.s1-master:
1217
workdir: ./tmp/db_dev/3302
1318
advertise_uri: localhost:3302
Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
cartridge = require('cartridge')
2-
replicasets = {{
3-
alias = 'app-router',
4-
roles = {'vshard-router', 'app.roles.custom', 'app.roles.api_router'},
5-
join_servers = {{uri = 'localhost:3301'}}
6-
}, {
7-
alias = 'app-router-second',
8-
roles = {'vshard-router', 'app.roles.custom', 'app.roles.api_router'},
9-
join_servers = {{uri = 'localhost:3311'}}
10-
}, {
11-
alias = 's1-storage',
12-
roles = {'vshard-storage', 'app.roles.api_storage'},
13-
join_servers = {{uri = 'localhost:3302'}, {uri = 'localhost:3303'}}
14-
}, {
15-
alias = 's2-storage',
16-
roles = {'vshard-storage', 'app.roles.api_storage'},
17-
join_servers = {{uri = 'localhost:3304'}, {uri = 'localhost:3305'}}
18-
}}
19-
return cartridge.admin_edit_topology({replicasets = replicasets})
2+
replicasets = { {
3+
alias = 'app-router',
4+
roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' },
5+
join_servers = { { uri = 'localhost:3301' } }
6+
}, {
7+
alias = 'app-router-second',
8+
roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' },
9+
join_servers = { { uri = 'localhost:3311' } }
10+
}, {
11+
alias = 'app-router-third',
12+
roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' },
13+
join_servers = { { uri = 'localhost:3312' } }
14+
}, {
15+
alias = 's1-storage',
16+
roles = { 'vshard-storage', 'app.roles.api_storage' },
17+
join_servers = { { uri = 'localhost:3302' }, { uri = 'localhost:3303' } }
18+
}, {
19+
alias = 's2-storage',
20+
roles = { 'vshard-storage', 'app.roles.api_storage' },
21+
join_servers = { { uri = 'localhost:3304' }, { uri = 'localhost:3305' } }
22+
} }
23+
return cartridge.admin_edit_topology({ replicasets = replicasets })

0 commit comments

Comments
 (0)