Skip to content

Commit acfa947

Browse files
committed
Fix union types in CCS
1 parent 38b62ab commit acfa947

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
7676

7777
private static TestFeatureService remoteFeaturesService;
7878
private static RestClient remoteClusterClient;
79+
private static DataLocation dataLocation = null;
7980

8081
@ParametersFactory(argumentFormatting = "%2$s.%3$s")
8182
public static List<Object[]> readScriptSpec() throws Exception {
@@ -188,8 +189,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
188189
*/
189190
static RestClient twoClients(RestClient localClient, RestClient remoteClient) throws IOException {
190191
RestClient twoClients = mock(RestClient.class);
192+
assertNotNull("data location was set", dataLocation);
191193
// write to a single cluster for now due to the precision of some functions such as avg and tests related to updates
192-
final RestClient bulkClient = randomFrom(localClient, remoteClient);
194+
final RestClient bulkClient = dataLocation == DataLocation.REMOTE_ONLY ? remoteClient : randomFrom(localClient, remoteClient);
193195
when(twoClients.performRequest(any())).then(invocation -> {
194196
Request request = invocation.getArgument(0);
195197
String endpoint = request.getEndpoint();
@@ -214,6 +216,11 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
214216
return twoClients;
215217
}
216218

219+
enum DataLocation {
220+
REMOTE_ONLY,
221+
ANY_CLUSTER
222+
}
223+
217224
static Request[] cloneRequests(Request orig, int numClones) throws IOException {
218225
Request[] clones = new Request[numClones];
219226
for (int i = 0; i < clones.length; i++) {
@@ -238,26 +245,33 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
238245
* Convert FROM employees ... => FROM *:employees,employees
239246
*/
240247
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
248+
if (dataLocation == null) {
249+
dataLocation = randomFrom(DataLocation.values());
250+
}
241251
String query = testCase.query;
242252
String[] commands = query.split("\\|");
243253
String first = commands[0].trim();
244254
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
245255
String[] parts = commands[0].split("(?i)metadata");
246256
assert parts.length >= 1 : parts;
247257
String fromStatement = parts[0];
248-
249-
String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
250-
String remoteIndices = Arrays.stream(localIndices)
251-
.map(index -> "*:" + index.trim() + "," + index.trim())
252-
.collect(Collectors.joining(","));
253-
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
258+
String index = fromStatement.substring("FROM ".length());
259+
if (dataLocation == DataLocation.REMOTE_ONLY && randomBoolean()) {
260+
index = remoteIndices(index);
261+
} else {
262+
index = index + "," + remoteIndices(index);
263+
}
264+
var newFrom = "FROM " + index + " " + commands[0].substring(fromStatement.length());
254265
testCase.query = newFrom + query.substring(first.length());
255266
}
256267
if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) {
257268
String[] parts = commands[0].split("\\s+");
258269
assert parts.length >= 2 : commands[0];
259-
String[] indices = parts[1].split(",");
260-
parts[1] = Arrays.stream(indices).map(index -> "*:" + index + "," + index).collect(Collectors.joining(","));
270+
if (dataLocation == DataLocation.REMOTE_ONLY && randomBoolean()) {
271+
parts[1] = remoteIndices(parts[1]);
272+
} else {
273+
parts[1] = parts[1] + "," + remoteIndices(parts[1]);
274+
}
261275
String newNewMetrics = String.join(" ", parts);
262276
testCase.query = newNewMetrics + query.substring(first.length());
263277
}
@@ -274,6 +288,11 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
274288
return testCase;
275289
}
276290

291+
static String remoteIndices(String localIndices) {
292+
String[] parts = localIndices.split(",");
293+
return Arrays.stream(parts).map(index -> "*:" + index.trim()).collect(Collectors.joining(","));
294+
}
295+
277296
static boolean hasIndexMetadata(String query) {
278297
String[] commands = query.split("\\|");
279298
if (commands[0].trim().toLowerCase(Locale.ROOT).startsWith("from")) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,4 +991,28 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha
991991
remoteClient.admin().indices().prepareRefresh(indexName).get();
992992
}
993993

994+
public void testMultiTypes() throws Exception {
995+
Client remoteClient = client(REMOTE_CLUSTER_1);
996+
int totalDocs = 0;
997+
for (String type : List.of("integer", "long")) {
998+
String index = "conflict-index-" + type;
999+
assertAcked(remoteClient.admin().indices().prepareCreate(index).setMapping("port", "type=" + type));
1000+
int numDocs = between(1, 10);
1001+
for (int i = 0; i < numDocs; i++) {
1002+
remoteClient.prepareIndex(index).setId(Integer.toString(i)).setSource("port", i).get();
1003+
}
1004+
remoteClient.admin().indices().prepareRefresh(index).get();
1005+
totalDocs += numDocs;
1006+
}
1007+
for (String castFunction : List.of("TO_LONG", "TO_INT")) {
1008+
EsqlQueryRequest request = new EsqlQueryRequest();
1009+
request.query("FROM *:conflict-index-* | EVAL port=" + castFunction + "(port) | WHERE port is NOT NULL | STATS COUNT(port)");
1010+
try (EsqlQueryResponse resp = runQuery(request)) {
1011+
List<List<Object>> values = getValuesList(resp);
1012+
assertThat(values, hasSize(1));
1013+
assertThat(values.get(0), hasSize(1));
1014+
assertThat(values.get(0).get(0), equalTo((long) totalDocs));
1015+
}
1016+
}
1017+
}
9941018
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy
157157
BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference);
158158
MultiTypeEsField unionTypes = findUnionTypes(attr);
159159
if (unionTypes != null) {
160-
String indexName = shardContext.ctx.index().getName();
160+
// Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix
161+
String indexName = shardContext.ctx.getFullyQualifiedIndex().getName();
161162
Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
162163
return conversion == null
163164
? BlockLoader.CONSTANT_NULLS

0 commit comments

Comments
 (0)