diff --git a/docs/changelog/129164.yaml b/docs/changelog/129164.yaml new file mode 100644 index 0000000000000..0d16fdf1b239e --- /dev/null +++ b/docs/changelog/129164.yaml @@ -0,0 +1,5 @@ +pr: 129164 +summary: Log partial failures +area: ES|QL +type: feature +issues: [] diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java index 3d38b697dc5be..b432339ccf4fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java @@ -9,6 +9,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -21,6 +22,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.MediaType; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.arrow.ArrowFormat; @@ -30,6 +32,7 @@ import java.io.IOException; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -121,6 +124,7 @@ private EsqlResponseListener(RestChannel channel, RestRequest restRequest, Strin @Override protected void processResponse(EsqlQueryResponse esqlQueryResponse) throws IOException { + logPartialFailures(channel.request().rawPath(), channel.request().params(), esqlQueryResponse.getExecutionInfo()); channel.sendResponse(buildResponse(esqlQueryResponse)); } @@ -229,4 +233,24 @@ private void checkDelimiter() { throw new IllegalArgumentException(message); } } + + /** + * Log all partial request failures to the {@code rest.suppressed} logger + * so an operator can categorize them after the fact. + */ + static void logPartialFailures(String rawPath, Map params, EsqlExecutionInfo executionInfo) { + if (executionInfo == null) { + return; + } + for (EsqlExecutionInfo.Cluster cluster : executionInfo.getClusters().values()) { + for (ShardSearchFailure failure : cluster.getFailures()) { + if (LOGGER.isWarnEnabled()) { + String clusterMessage = cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) + ? "" + : ", cluster: " + cluster.getClusterAlias(); + LOGGER.warn("partial failure at path: {}, params: {}{}", rawPath, params, clusterMessage, failure); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java new file mode 100644 index 0000000000000..c96af8b00ab99 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlResponseListenerTests.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.filter.RegexFilter; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteClusterAware; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class EsqlResponseListenerTests extends ESTestCase { + private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + + private static MockAppender appender; + static Logger logger = LogManager.getLogger(EsqlResponseListener.class); + + @BeforeClass + public static void init() throws IllegalAccessException { + appender = new MockAppender("testAppender"); + appender.start(); + Configurator.setLevel(logger, Level.DEBUG); + Loggers.addAppender(logger, appender); + } + + @After + public void clear() { + appender.events.clear(); + } + + @AfterClass + public static void cleanup() { + appender.stop(); + Loggers.removeAppender(logger, appender); + } + + public void testLogPartialFailures() { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false); + executionInfo.swapCluster( + LOCAL_CLUSTER_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster( + LOCAL_CLUSTER_ALIAS, + "idx", + false, + EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, + 10, + 10, + 3, + 0, + List.of( + new ShardSearchFailure(new Exception("dummy"), target(LOCAL_CLUSTER_ALIAS, 0)), + new ShardSearchFailure(new Exception("error"), target(LOCAL_CLUSTER_ALIAS, 1)) + ), + new TimeValue(4444L) + ) + ); + EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo); + + assertThat(appender.events, hasSize(2)); + LogEvent logEvent = appender.events.get(0); + assertThat(logEvent.getLevel(), equalTo(Level.WARN)); + assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("partial failure at path: /_query, params: {}")); + assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy")); + logEvent = appender.events.get(1); + assertThat(logEvent.getLevel(), equalTo(Level.WARN)); + assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("partial failure at path: /_query, params: {}")); + assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("error")); + } + + public void testLogPartialFailuresRemote() { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false); + executionInfo.swapCluster( + "remote_cluster", + (k, v) -> new EsqlExecutionInfo.Cluster( + "remote_cluster", + "idx", + false, + EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, + 10, + 10, + 3, + 0, + List.of(new ShardSearchFailure(new Exception("dummy"), target("remote_cluster", 0))), + new TimeValue(4444L) + ) + ); + EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo); + + assertThat(appender.events, hasSize(1)); + LogEvent logEvent = appender.events.get(0); + assertThat(logEvent.getLevel(), equalTo(Level.WARN)); + assertThat( + logEvent.getMessage().getFormattedMessage(), + equalTo("partial failure at path: /_query, params: {}, cluster: remote_cluster") + ); + assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy")); + } + + private SearchShardTarget target(String clusterAlias, int shardId) { + return new SearchShardTarget("node", new ShardId("idx", "uuid", shardId), clusterAlias); + } + + private static class MockAppender extends AbstractAppender { + public final List events = new ArrayList<>(); + + MockAppender(final String name) throws IllegalAccessException { + super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false); + } + + @Override + public void append(LogEvent event) { + events.add(event.toImmutable()); + } + } +}