Skip to content

Filter wrong formatted discovery addresses #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ tarantool> function get_cluster_nodes() return { 'host1:3301', 'host2:3302', 'ho

You need to pay attention to a function contract we are currently supporting:
* The client never passes any arguments to a discovery function.
* A discovery function _should_ return a single result of strings (i.e. single
string `return 'host:3301'` or array of strings `return {'host1:3301', 'host2:3301'}`).
* A discovery function _must_ return an array of strings (i.e `return {'host1:3301', 'host2:3301'}`).
* Each string _should_ satisfy the following pattern `host[:port]`
(or more strictly `/^[^:]+(:\d+)?$/` - a mandatory host containing any string
and an optional colon followed by digits of the port). Also, the port must be
in a range between 1 and 65535 if one is presented.
* A discovery function _may_ return multi-results but the client takes
into account only first of them (i.e. `return {'host:3301'}, discovery_delay`, where
the second result is unused). Even more, any extra results __are reserved__ by the client
Expand Down Expand Up @@ -271,6 +274,8 @@ client.syncOps().insert(45, Arrays.asList(1, 1));
* A discovery task always uses an active client connection to get the nodes list.
It's in your responsibility to provide a function availability as well as a consistent
nodes list on all instances you initially set or obtain from the task.
* Every address which is unmatched with `host[:port]` pattern will be filtered out from
the target addresses list.
* If some error occurs while a discovery task is running then this task
will be aborted without any after-effects for next task executions. These cases, for instance, are
a wrong function result (see discovery function contract) or a broken connection.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tarantool/AbstractTarantoolOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
public abstract class AbstractTarantoolOps<Space, Tuple, Operation, Result>
implements TarantoolClientOps<Space, Tuple, Operation, Result> {

private Code callCode = Code.OLD_CALL;
private Code callCode = Code.CALL;

protected abstract Result exec(Code code, Object... args);

Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/tarantool/TarantoolClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ public class TarantoolClientConfig {
public long writeTimeoutMillis = 60 * 1000L;

/**
* Use old call command https://github.com/tarantool/doc/issues/54,
* please ensure that you server supports new call command.
* Use new call method instead of obsolete
* {@code call_16} which used to work in Tarantool v1.6.
*
* Since 1.9.3, this flag has become enabled by default
*/
public boolean useNewCall = false;
public boolean useNewCall = true;

/**
* Max time to establish connection to the server
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/tarantool/TarantoolClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon
this.syncOps = new SyncOps();
this.composableAsyncOps = new ComposableAsyncOps();
this.fireAndForgetOps = new FireAndForgetOps();
if (config.useNewCall) {
setCallCode(Code.CALL);
this.syncOps.setCallCode(Code.CALL);
this.fireAndForgetOps.setCallCode(Code.CALL);
this.composableAsyncOps.setCallCode(Code.CALL);
if (!config.useNewCall) {
setCallCode(Code.OLD_CALL);
this.syncOps.setCallCode(Code.OLD_CALL);
this.fireAndForgetOps.setCallCode(Code.OLD_CALL);
this.composableAsyncOps.setCallCode(Code.OLD_CALL);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.tarantool.TarantoolClient;
import org.tarantool.TarantoolClientOps;
import org.tarantool.TarantoolClusterClientConfig;
import org.tarantool.util.StringUtils;

import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -36,28 +37,57 @@ public Set<String> getInstances() {
// validation against the data returned;
// this strict-mode allows us to extend the contract in a non-breaking
// way for old clients just reserve an extra return value in
// terms of LUA multi-result support.
checkResult(list);

List<Object> funcResult = (List<Object>) list.get(0);
return funcResult.stream()
.map(Object::toString)
.collect(Collectors.toCollection(LinkedHashSet::new));
// terms of Lua multi-result support.;
return checkAndFilterAddresses(list);
}

/**
* Check whether the result follows the contract or not.
* The contract is a mandatory <b>single array of strings</b>
* The contract is a mandatory <b>single array of strings</b>.
*
* The simplified format for each string is host[:port].
*
* @param result result to be validated
*/
private void checkResult(List<?> result) {
private Set<String> checkAndFilterAddresses(List<?> result) {
if (result == null || result.isEmpty()) {
throw new IllegalDiscoveryFunctionResult("Discovery function returned no data");
}
if (!((List<Object>)result.get(0)).stream().allMatch(item -> item instanceof String)) {
if (!(result.get(0) instanceof List)) {
throw new IllegalDiscoveryFunctionResult("The first value must be an array of strings");
}

return ((List<Object>) result.get(0)).stream()
.filter(item -> item instanceof String)
.map(Object::toString)
.filter(s -> !StringUtils.isBlank(s))
.filter(this::isAddress)
.collect(Collectors.toCollection(LinkedHashSet::new));
}

/**
* Checks that address matches host[:port] format.
*
* @param address to be checked
* @return true if address follows the format
*/
private boolean isAddress(String address) {
if (address.endsWith(":")) {
return false;
}
String[] addressParts = address.split(":");
if (addressParts.length > 2) {
return false;
}
if (addressParts.length == 2) {
try {
int port = Integer.parseInt(addressParts[1]);
return (port > 0 && port < 65536);
} catch (NumberFormatException e) {
return false;
}
}
return true;
}

}
3 changes: 1 addition & 2 deletions src/test/java/org/tarantool/ClientAsyncOperationsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ void testCall(AsyncOpsProvider provider) throws ExecutionException, InterruptedE
testHelper.executeLua("function echo(...) return ... end");

Future<List<?>> fut = provider.getAsyncOps().call("echo", "hello");
assertEquals(Collections.singletonList(Collections.singletonList("hello")),
fut.get(TIMEOUT, TimeUnit.MILLISECONDS));
assertEquals(Collections.singletonList("hello"), fut.get(TIMEOUT, TimeUnit.MILLISECONDS));

provider.close();
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/tarantool/ClientReconnectClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void testDelayFunctionResultFetch() {
instances.get(SRV1)
.executeLua("co = coroutine.create(function() " + functionBody + " end)");
instances.get(SRV1)
.executeLua("function getAddressesFunction() local c, r = coroutine.resume(co); return r end");
.executeLua("function getAddressesFunction() local c, r = coroutine.resume(co); return {r} end");

String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service3Address));
instances.get(SRV2).executeLua(infoFunctionScript);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/tarantool/TarantoolClientOpsIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ public void testEval(SyncOpsProvider provider) {
@MethodSource("getClientOps")
public void testCall(SyncOpsProvider provider) {
assertEquals(
Collections.singletonList(Collections.singletonList("true")),
Collections.singletonList("true"),
provider.getClientOps().call("echo", "true")
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tarantool.cluster;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -23,6 +24,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -72,7 +74,7 @@ public void testSuccessfulAddressParsing() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

Expand All @@ -91,7 +93,7 @@ public void testSuccessfulUniqueAddressParsing() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

Expand All @@ -110,7 +112,7 @@ public void testFunctionReturnedEmptyList() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

Expand All @@ -124,7 +126,7 @@ public void testWrongFunctionName() {
clusterConfig.clusterDiscoveryEntryFunction = "wrongFunction";

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

assertThrows(TarantoolException.class, discoverer::getInstances);
}
Expand All @@ -136,7 +138,7 @@ public void testWrongInstanceAddress() {

client.close();
TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

assertThrows(CommunicationException.class, discoverer::getInstances);
}
Expand All @@ -148,7 +150,19 @@ public void testWrongTypeResultData() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances);
}

@Test
@DisplayName("fetched with an exception when a single string returned")
public void testSingleStringResultData() {
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "'host1:3301'");
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances);
}
Expand All @@ -172,7 +186,7 @@ public void testWrongMultiResultData() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

Expand All @@ -188,9 +202,68 @@ public void testFunctionWithError() {
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

assertThrows(TarantoolException.class, discoverer::getInstances);
}

@Test
@DisplayName("fetched a subset of valid addresses")
public void testFilterBadAddressesData() {
final List<String> allHosts = Arrays.asList(
"host1:3313",
"host:abc",
"192.168.33.90",
"myHost",
"10.30.10.4:7814",
"host:311:sub-host",
"instance-2:",
"host:0",
"host:321981"
);

final Set<String> expectedFiltered = new HashSet<>(
Arrays.asList(
"host1:3313",
"192.168.33.90",
"myHost",
"10.30.10.4:7814"
)
);

String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, allHosts);
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

assertNotNull(instances);
assertFalse(instances.isEmpty());
assertEquals(expectedFiltered, instances);
}

@Test
@DisplayName("fetched an empty set after filtration")
public void testFullBrokenAddressesList() {
List<String> allHosts = Arrays.asList(
"abc:edf",
"192.168.33.90:",
"host:-123",
"host:0"
);

String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, allHosts);
testHelper.executeLua(functionCode);

TarantoolClusterStoredFunctionDiscoverer discoverer =
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);

Set<String> instances = discoverer.getInstances();

assertNotNull(instances);
assertTrue(instances.isEmpty());
}

}