From e3333f51e67e81351aa3b11404a7f561df1737d2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 May 2025 22:47:30 -0700 Subject: [PATCH 1/2] Fix union types in CCS (#128111) Currently, union types in CCS is broken. For example, FROM *:remote-indices | EVAL port = TO_INT(port) returns all nulls if the types of the port field conflict. This happens because converters are a map of the fully qualified cluster:index -name (defined in MultiTypeEsField), but we are looking up the converter using only the index name, which leads to a wrong or missing converter on remote clusters. Our tests didn't catch this because MultiClusterSpecIT generates the same index for both clusters, allowing the local converter to be used for remote indices. --- docs/changelog/128111.yaml | 5 ++++ .../xpack/esql/ccq/Clusters.java | 6 +++++ .../esql/action/CrossClustersQueryIT.java | 24 +++++++++++++++++++ .../planner/EsPhysicalOperationProviders.java | 3 ++- 4 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/128111.yaml diff --git a/docs/changelog/128111.yaml b/docs/changelog/128111.yaml new file mode 100644 index 0000000000000..d3b113a682d4a --- /dev/null +++ b/docs/changelog/128111.yaml @@ -0,0 +1,5 @@ +pr: 128111 +summary: Fix union types in CCS +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 5f3f135810322..392745d5084e2 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -56,6 +56,12 @@ public static org.elasticsearch.Version remoteClusterVersion() { return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT; } + public static org.elasticsearch.Version bwcVersion() { + org.elasticsearch.Version local = localClusterVersion(); + org.elasticsearch.Version remote = remoteClusterVersion(); + return local.before(remote) ? local : remote; + } + private static Version distributionVersion(String key) { final String val = System.getProperty(key); return val != null ? Version.fromString(val) : Version.CURRENT; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 1a99fd9d0383d..54b0f429eef7e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -1058,4 +1058,28 @@ private void clearSkipUnavailable() { .setPersistentSettings(settingsBuilder.build()) .get(); } + public void testMultiTypes() throws Exception { + Client remoteClient = client(REMOTE_CLUSTER_1); + int totalDocs = 0; + for (String type : List.of("integer", "long")) { + String index = "conflict-index-" + type; + assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type)); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get(); + } + remoteClient.admin().indices().prepareRefresh(index).get(); + totalDocs += numDocs; + } + for (String castFunction : List.of("TO_LONG", "TO_INT")) { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)"); + try (EsqlQueryResponse resp = runQuery(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), hasSize(1)); + assertThat(values.get(0).get(0), equalTo((long) totalDocs)); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index b2d627f4090b3..fdc6701cd0d9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -142,7 +142,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); MultiTypeEsField unionTypes = findUnionTypes(attr); if (unionTypes != null) { - String indexName = shardContext.ctx.index().getName(); + // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix + String indexName = shardContext.ctx.getFullyQualifiedIndex().getName(); Expression conversion = unionTypes.getConversionExpressionForIndex(indexName); return conversion == null ? BlockLoader.CONSTANT_NULLS From 1b90caeb8d7ccc80ad865f59715877aa21f5a51d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 20 May 2025 15:37:20 +0000 Subject: [PATCH 2/2] [CI] Auto commit changes from spotless --- .../elasticsearch/xpack/esql/action/CrossClustersQueryIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 54b0f429eef7e..840ca0ed1a8a0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -1058,6 +1058,7 @@ private void clearSkipUnavailable() { .setPersistentSettings(settingsBuilder.build()) .get(); } + public void testMultiTypes() throws Exception { Client remoteClient = client(REMOTE_CLUSTER_1); int totalDocs = 0;