Skip to content

Commit 720e3b5

Browse files
authored
Fix union types in CCS (#128111) (#128169)
Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices.
1 parent 173b661 commit 720e3b5

File tree

5 files changed

+67
-7
lines changed

5 files changed

+67
-7
lines changed

docs/changelog/128111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128111
2+
summary: Fix union types in CCS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
6767
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
6868
}
6969

70+
public static org.elasticsearch.Version bwcVersion() {
71+
org.elasticsearch.Version local = localClusterVersion();
72+
org.elasticsearch.Version remote = remoteClusterVersion();
73+
return local.before(remote) ? local : remote;
74+
}
75+
7076
private static Version distributionVersion(String key) {
7177
final String val = System.getProperty(key);
7278
return val != null ? Version.fromString(val) : Version.CURRENT;

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
7373

7474
private static TestFeatureService remoteFeaturesService;
7575
private static RestClient remoteClusterClient;
76+
private static DataLocation dataLocation = null;
7677

7778
@ParametersFactory(argumentFormatting = "%2$s.%3$s")
7879
public static List<Object[]> readScriptSpec() throws Exception {
@@ -181,8 +182,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
181182
*/
182183
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
183184
RestClient twoClients = mock(RestClient.class);
185+
assertNotNull("data location was set", dataLocation);
184186
// write to a single cluster for now due to the precision of some functions such as avg and tests related to updates
185-
final RestClient bulkClient = randomFrom(localClient, remoteClient);
187+
final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient);
186188
when(twoClients.performRequest(any())).then(invocation -> {
187189
Request request = invocation.getArgument(0);
188190
String endpoint = request.getEndpoint();
@@ -207,6 +209,11 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
207209
return twoClients;
208210
}
209211

212+
enum DataLocation {
213+
REMOTE_ONLY,
214+
ANY_CLUSTER
215+
}
216+
210217
static Request[] cloneRequests(Request orig, int numClones) throws IOException {
211218
Request[] clones = new Request[numClones];
212219
for (int i = 0; i < clones.length; i++) {
@@ -231,26 +238,37 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
231238
* Convert FROM employees ... => FROM *:employees,employees
232239
*/
233240
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
241+
if (dataLocation == null) {
242+
dataLocation = randomFrom(DataLocation.values());
243+
}
234244
String query = testCase.query;
235245
String[] commands = query.split("\\|");
236246
String first = commands[0].trim();
237247
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
238248
String[] parts = commands[0].split("(?i)metadata");
239249
assert parts.length >= 1 : parts;
240250
String fromStatement = parts[0];
241-
242251
String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
243-
String remoteIndices = Arrays.stream(localIndices)
244-
.map(index -> "*:" + index.trim() + "," + index.trim())
245-
.collect(Collectors.joining(","));
252+
final String remoteIndices;
253+
if (canUseRemoteIndicesOnly() && randomBoolean()) {
254+
remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
255+
} else {
256+
remoteIndices = Arrays.stream(localIndices)
257+
.map(index -> "*:" + index.trim() + "," + index.trim())
258+
.collect(Collectors.joining(","));
259+
}
246260
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
247261
testCase.query = newFrom + query.substring(first.length());
248262
}
249263
if (commands[0].toLowerCase(Locale.ROOT).startsWith("metrics")) {
250264
String[] parts = commands[0].split("\\s+");
251265
assert parts.length >= 2 : commands[0];
252266
String[] indices = parts[1].split(",");
253-
parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(","));
267+
if (canUseRemoteIndicesOnly() && randomBoolean()) {
268+
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
269+
} else {
270+
parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim() + "," + index.trim()).collect(Collectors.joining(","));
271+
}
254272
String newNewMetrics = String.join(" ", parts);
255273
testCase.query = newNewMetrics + query.substring(first.length());
256274
}
@@ -267,6 +285,12 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
267285
return testCase;
268286
}
269287

288+
static boolean canUseRemoteIndicesOnly() {
289+
// If the data is indexed only into the remote cluster, we can query only the remote indices.
290+
// However, due to the union types bug in CCS, we must include the local indices in versions without the fix.
291+
return dataLocation == DataLocation.REMOTE_ONLY && Clusters.bwcVersion().onOrAfter(Version.V_8_19_0);
292+
}
293+
270294
static boolean hasIndexMetadata(String query) {
271295
String[] commands = query.split("\\|");
272296
if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,4 +991,28 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha
991991
remoteClient.admin().indices().prepareRefresh(indexName).get();
992992
}
993993

994+
public void testMultiTypes() throws Exception {
995+
Client remoteClient = client(REMOTE_CLUSTER_1);
996+
int totalDocs = 0;
997+
for (String type : List.of("integer", "long")) {
998+
String index = "conflict-index-" + type;
999+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1000+
int numDocs = between(1, 10);
1001+
for (int i = 0; i < numDocs; i++) {
1002+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1003+
}
1004+
remoteClient.admin().indices().prepareRefresh(index).get();
1005+
totalDocs += numDocs;
1006+
}
1007+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1008+
EsqlQueryRequest request = new EsqlQueryRequest();
1009+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1010+
try (EsqlQueryResponse resp = runQuery(request)) {
1011+
List<List<Object>> values = getValuesList(resp);
1012+
assertThat(values, hasSize(1));
1013+
assertThat(values.get(0), hasSize(1));
1014+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1015+
}
1016+
}
1017+
}
9941018
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ private BlockLoader getBlockLoaderFor(
161161
DefaultShardContext shardContext = (DefaultShardContext) shardContexts.get(shardId);
162162
BlockLoader blockLoader = shardContext.blockLoader(fieldName, isUnsupported, fieldExtractPreference);
163163
if (unionTypes != null) {
164-
String indexName = shardContext.ctx.index().getName();
164+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
165+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
165166
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
166167
return conversion == null
167168
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)