Skip to content

Commit f188a24

Browse files
authored
ESQL: Log partial failures (#129164) (#129261)
Now that ESQL has `allow_partial_results` we can reply with a `200` even though some nodes failed to run ESQL. This could happen because the node is restarting. Or because of a bug. Or a disconnect. All kinds of things. This logs those partial failures so an operator can look at them and get a sense of why they are happening.
1 parent 30cb238 commit f188a24

File tree

3 files changed

+167
-0
lines changed

3 files changed

+167
-0
lines changed

docs/changelog/129164.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 129164
2+
summary: Log partial failures
3+
area: ES|QL
4+
type: feature
5+
issues: []

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.search.ShardSearchFailure;
1213
import org.elasticsearch.core.Releasable;
1314
import org.elasticsearch.core.Releasables;
1415
import org.elasticsearch.core.TimeValue;
@@ -21,6 +22,7 @@
2122
import org.elasticsearch.rest.RestResponse;
2223
import org.elasticsearch.rest.RestStatus;
2324
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
25+
import org.elasticsearch.transport.RemoteClusterAware;
2426
import org.elasticsearch.xcontent.MediaType;
2527
import org.elasticsearch.xcontent.XContentType;
2628
import org.elasticsearch.xpack.esql.arrow.ArrowFormat;
@@ -30,6 +32,7 @@
3032

3133
import java.io.IOException;
3234
import java.util.Locale;
35+
import java.util.Map;
3336
import java.util.concurrent.TimeUnit;
3437
import java.util.function.Consumer;
3538

@@ -121,6 +124,7 @@ private EsqlResponseListener(RestChannel channel, RestRequest restRequest, Strin
121124

122125
@Override
123126
protected void processResponse(EsqlQueryResponse esqlQueryResponse) throws IOException {
127+
logPartialFailures(channel.request().rawPath(), channel.request().params(), esqlQueryResponse.getExecutionInfo());
124128
channel.sendResponse(buildResponse(esqlQueryResponse));
125129
}
126130

@@ -229,4 +233,24 @@ private void checkDelimiter() {
229233
throw new IllegalArgumentException(message);
230234
}
231235
}
236+
237+
/**
238+
* Log all partial request failures to the {@code rest.suppressed} logger
239+
* so an operator can categorize them after the fact.
240+
*/
241+
static void logPartialFailures(String rawPath, Map<String, String> params, EsqlExecutionInfo executionInfo) {
242+
if (executionInfo == null) {
243+
return;
244+
}
245+
for (EsqlExecutionInfo.Cluster cluster : executionInfo.getClusters().values()) {
246+
for (ShardSearchFailure failure : cluster.getFailures()) {
247+
if (LOGGER.isWarnEnabled()) {
248+
String clusterMessage = cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
249+
? ""
250+
: ", cluster: " + cluster.getClusterAlias();
251+
LOGGER.warn("partial failure at path: {}, params: {}{}", rawPath, params, clusterMessage, failure);
252+
}
253+
}
254+
}
255+
}
232256
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.config.Configurator;
16+
import org.apache.logging.log4j.core.filter.RegexFilter;
17+
import org.elasticsearch.action.search.ShardSearchFailure;
18+
import org.elasticsearch.common.logging.Loggers;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.search.SearchShardTarget;
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.transport.RemoteClusterAware;
24+
import org.junit.After;
25+
import org.junit.AfterClass;
26+
import org.junit.BeforeClass;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.hasSize;
34+
35+
public class EsqlResponseListenerTests extends ESTestCase {
36+
private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
37+
38+
private static MockAppender appender;
39+
static Logger logger = LogManager.getLogger(EsqlResponseListener.class);
40+
41+
@BeforeClass
42+
public static void init() throws IllegalAccessException {
43+
appender = new MockAppender("testAppender");
44+
appender.start();
45+
Configurator.setLevel(logger, Level.DEBUG);
46+
Loggers.addAppender(logger, appender);
47+
}
48+
49+
@After
50+
public void clear() {
51+
appender.events.clear();
52+
}
53+
54+
@AfterClass
55+
public static void cleanup() {
56+
appender.stop();
57+
Loggers.removeAppender(logger, appender);
58+
}
59+
60+
public void testLogPartialFailures() {
61+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false);
62+
executionInfo.swapCluster(
63+
LOCAL_CLUSTER_ALIAS,
64+
(k, v) -> new EsqlExecutionInfo.Cluster(
65+
LOCAL_CLUSTER_ALIAS,
66+
"idx",
67+
false,
68+
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
69+
10,
70+
10,
71+
3,
72+
0,
73+
List.of(
74+
new ShardSearchFailure(new Exception("dummy"), target(LOCAL_CLUSTER_ALIAS, 0)),
75+
new ShardSearchFailure(new Exception("error"), target(LOCAL_CLUSTER_ALIAS, 1))
76+
),
77+
new TimeValue(4444L)
78+
)
79+
);
80+
EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo);
81+
82+
assertThat(appender.events, hasSize(2));
83+
LogEvent logEvent = appender.events.get(0);
84+
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
85+
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("partial failure at path: /_query, params: {}"));
86+
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy"));
87+
logEvent = appender.events.get(1);
88+
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
89+
assertThat(logEvent.getMessage().getFormattedMessage(), equalTo("partial failure at path: /_query, params: {}"));
90+
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("error"));
91+
}
92+
93+
public void testLogPartialFailuresRemote() {
94+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(false);
95+
executionInfo.swapCluster(
96+
"remote_cluster",
97+
(k, v) -> new EsqlExecutionInfo.Cluster(
98+
"remote_cluster",
99+
"idx",
100+
false,
101+
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
102+
10,
103+
10,
104+
3,
105+
0,
106+
List.of(new ShardSearchFailure(new Exception("dummy"), target("remote_cluster", 0))),
107+
new TimeValue(4444L)
108+
)
109+
);
110+
EsqlResponseListener.logPartialFailures("/_query", Map.of(), executionInfo);
111+
112+
assertThat(appender.events, hasSize(1));
113+
LogEvent logEvent = appender.events.get(0);
114+
assertThat(logEvent.getLevel(), equalTo(Level.WARN));
115+
assertThat(
116+
logEvent.getMessage().getFormattedMessage(),
117+
equalTo("partial failure at path: /_query, params: {}, cluster: remote_cluster")
118+
);
119+
assertThat(logEvent.getThrown().getCause().getMessage(), equalTo("dummy"));
120+
}
121+
122+
private SearchShardTarget target(String clusterAlias, int shardId) {
123+
return new SearchShardTarget("node", new ShardId("idx", "uuid", shardId), clusterAlias);
124+
}
125+
126+
private static class MockAppender extends AbstractAppender {
127+
public final List<LogEvent> events = new ArrayList<>();
128+
129+
MockAppender(final String name) throws IllegalAccessException {
130+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
131+
}
132+
133+
@Override
134+
public void append(LogEvent event) {
135+
events.add(event.toImmutable());
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)