Skip to content

Commit 657b984

Browse files
authored
Fix union types in CCS (#128111) (#128207)
Backport #128111 to 9.0 Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices.
1 parent a02b385 commit 657b984

File tree

4 files changed

+38
-1
lines changed

4 files changed

+38
-1
lines changed

docs/changelog/128111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128111
2+
summary: Fix union types in CCS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public static org.elasticsearch.Version remoteClusterVersion() {
5656
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
5757
}
5858

59+
public static org.elasticsearch.Version bwcVersion() {
60+
org.elasticsearch.Version local = localClusterVersion();
61+
org.elasticsearch.Version remote = remoteClusterVersion();
62+
return local.before(remote) ? local : remote;
63+
}
64+
5965
private static Version distributionVersion(String key) {
6066
final String val = System.getProperty(key);
6167
return val != null ? Version.fromString(val) : Version.CURRENT;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,4 +1058,29 @@ private void clearSkipUnavailable() {
10581058
.setPersistentSettings(settingsBuilder.build())
10591059
.get();
10601060
}
1061+
1062+
public void testMultiTypes() throws Exception {
1063+
Client remoteClient = client(REMOTE_CLUSTER_1);
1064+
int totalDocs = 0;
1065+
for (String type : List.of("integer", "long")) {
1066+
String index = "conflict-index-" + type;
1067+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1068+
int numDocs = between(1, 10);
1069+
for (int i = 0; i < numDocs; i++) {
1070+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1071+
}
1072+
remoteClient.admin().indices().prepareRefresh(index).get();
1073+
totalDocs += numDocs;
1074+
}
1075+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1076+
EsqlQueryRequest request = new EsqlQueryRequest();
1077+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1078+
try (EsqlQueryResponse resp = runQuery(request)) {
1079+
List<List<Object>> values = getValuesList(resp);
1080+
assertThat(values, hasSize(1));
1081+
assertThat(values.get(0), hasSize(1));
1082+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1083+
}
1084+
}
1085+
}
10611086
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy
142142
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
143143
MultiTypeEsField unionTypes = findUnionTypes(attr);
144144
if (unionTypes != null) {
145-
String indexName = shardContext.ctx.index().getName();
145+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
146+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
146147
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
147148
return conversion == null
148149
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)