Skip to content

Commit 8db721b

Browse files
talevyjasontedor
authored andcommitted
add _retry API to index lifecycle policies (#30769)
1 parent f8fd6af commit 8db721b

File tree

11 files changed

+575
-7
lines changed

11 files changed

+575
-7
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*
6+
*/
7+
package org.elasticsearch.xpack.core.indexlifecycle.action;
8+
9+
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.ActionRequestBuilder;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.IndicesRequest;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.action.support.IndicesOptions.Option;
15+
import org.elasticsearch.action.support.IndicesOptions.WildcardStates;
16+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
17+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
18+
import org.elasticsearch.client.ElasticsearchClient;
19+
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
22+
import org.elasticsearch.common.xcontent.ToXContentObject;
23+
24+
import java.io.IOException;
25+
import java.util.Arrays;
26+
import java.util.EnumSet;
27+
import java.util.Objects;
28+
29+
public class RetryAction extends Action<RetryAction.Request, RetryAction.Response, RetryAction.RequestBuilder> {
30+
public static final RetryAction INSTANCE = new RetryAction();
31+
public static final String NAME = "indices:admin/xpack/index_lifecycle/_retry/post";
32+
33+
protected RetryAction() {
34+
super(NAME);
35+
}
36+
37+
@Override
38+
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
39+
return new RequestBuilder(client, this);
40+
}
41+
42+
@Override
43+
public Response newResponse() {
44+
return new Response();
45+
}
46+
47+
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
48+
49+
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
50+
super(client, action, new Request());
51+
}
52+
53+
}
54+
55+
public static class Response extends AcknowledgedResponse implements ToXContentObject {
56+
57+
public Response() {
58+
}
59+
60+
public Response(boolean acknowledged) {
61+
super(acknowledged);
62+
}
63+
64+
@Override
65+
public void readFrom(StreamInput in) throws IOException {
66+
readAcknowledged(in);
67+
}
68+
69+
@Override
70+
public void writeTo(StreamOutput out) throws IOException {
71+
writeAcknowledged(out);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(isAcknowledged());
77+
}
78+
79+
@Override
80+
public boolean equals(Object obj) {
81+
if (obj == null) {
82+
return false;
83+
}
84+
if (obj.getClass() != getClass()) {
85+
return false;
86+
}
87+
Response other = (Response) obj;
88+
return Objects.equals(isAcknowledged(), other.isAcknowledged());
89+
}
90+
91+
@Override
92+
public String toString() {
93+
return Strings.toString(this, true, true);
94+
}
95+
96+
}
97+
98+
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
99+
private String[] indices;
100+
101+
public Request(String... indices) {
102+
this.indices = indices;
103+
}
104+
105+
public Request() {
106+
}
107+
108+
@Override
109+
public Request indices(String... indices) {
110+
this.indices = indices;
111+
return this;
112+
}
113+
114+
@Override
115+
public String[] indices() {
116+
return indices;
117+
}
118+
119+
@Override
120+
public IndicesOptions indicesOptions() {
121+
// Re-run should only resolve to open concrete indices (not aliases)
122+
return new IndicesOptions(EnumSet.of(Option.IGNORE_ALIASES), EnumSet.of(WildcardStates.OPEN));
123+
}
124+
125+
@Override
126+
public ActionRequestValidationException validate() {
127+
return null;
128+
}
129+
130+
@Override
131+
public void readFrom(StreamInput in) throws IOException {
132+
super.readFrom(in);
133+
this.indices = in.readStringArray();
134+
}
135+
136+
@Override
137+
public void writeTo(StreamOutput out) throws IOException {
138+
super.writeTo(out);
139+
out.writeStringArray(indices);
140+
}
141+
142+
@Override
143+
public int hashCode() {
144+
return Arrays.hashCode(indices);
145+
}
146+
147+
@Override
148+
public boolean equals(Object obj) {
149+
if (obj == null) {
150+
return false;
151+
}
152+
if (obj.getClass() != getClass()) {
153+
return false;
154+
}
155+
Request other = (Request) obj;
156+
return Arrays.equals(indices, other.indices);
157+
}
158+
159+
}
160+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*
6+
*/
7+
package org.elasticsearch.xpack.core.indexlifecycle.action;
8+
9+
import org.elasticsearch.test.AbstractStreamableTestCase;
10+
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
11+
12+
public class ReRunRequestTests extends AbstractStreamableTestCase<Request> {
13+
14+
@Override
15+
protected Request createTestInstance() {
16+
String[] indices = new String[randomIntBetween(1, 10)];
17+
for (int i = 0; i < indices.length; i++) {
18+
indices[i] = randomAlphaOfLengthBetween(2, 5);
19+
}
20+
return new Request(indices);
21+
}
22+
23+
@Override
24+
protected Request createBlankInstance() {
25+
return new Request();
26+
}
27+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*
6+
*/
7+
package org.elasticsearch.xpack.core.indexlifecycle.action;
8+
9+
import org.elasticsearch.test.AbstractStreamableTestCase;
10+
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
11+
12+
public class ReRunResponseTests extends AbstractStreamableTestCase<Response> {
13+
14+
@Override
15+
protected Response createTestInstance() {
16+
return new Response(randomBoolean());
17+
}
18+
19+
@Override
20+
protected Response createBlankInstance() {
21+
return new Response();
22+
}
23+
24+
@Override
25+
protected Response mutateInstance(Response response) {
26+
return new Response(response.isAcknowledged() == false);
27+
}
28+
29+
}

x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,17 @@
3838
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
3939
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
4040
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
41+
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
4142
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
4243
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
4344
import org.elasticsearch.xpack.indexlifecycle.action.RestMoveToStepAction;
4445
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
46+
import org.elasticsearch.xpack.indexlifecycle.action.RestRetryAction;
4547
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
4648
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
4749
import org.elasticsearch.xpack.indexlifecycle.action.TransportMoveToStepAction;
4850
import org.elasticsearch.xpack.indexlifecycle.action.TransportPutLifecycleAction;
51+
import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction;
4952

5053
import java.time.Clock;
5154
import java.util.ArrayList;
@@ -140,7 +143,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
140143
new RestPutLifecycleAction(settings, restController),
141144
new RestGetLifecycleAction(settings, restController),
142145
new RestDeleteLifecycleAction(settings, restController),
143-
new RestMoveToStepAction(settings, restController));
146+
new RestMoveToStepAction(settings, restController),
147+
new RestRetryAction(settings, restController)
148+
);
144149
}
145150

146151
@Override
@@ -152,7 +157,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
152157
new ActionHandler<>(PutLifecycleAction.INSTANCE, TransportPutLifecycleAction.class),
153158
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
154159
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
155-
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class));
160+
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class),
161+
new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class));
156162
}
157163

158164
@Override

x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.io.IOException;
3434
import java.util.function.LongSupplier;
35+
import java.util.function.Supplier;
3536

3637
public class IndexLifecycleRunner {
3738
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
@@ -127,7 +128,7 @@ private void executeClusterStateSteps(Index index, String policy, Step step) {
127128
* @param indexSettings
128129
* the index settings to extract the {@link StepKey} from.
129130
*/
130-
static StepKey getCurrentStepKey(Settings indexSettings) {
131+
public static StepKey getCurrentStepKey(Settings indexSettings) {
131132
String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings);
132133
String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings);
133134
String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings);
@@ -199,13 +200,35 @@ static ClusterState moveClusterStateToErrorStep(Index index, ClusterState cluste
199200
return newClusterStateBuilder.build();
200201
}
201202

203+
ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
204+
ClusterState newState = currentState;
205+
for (String index : indices) {
206+
IndexMetaData indexMetaData = currentState.metaData().index(index);
207+
if (indexMetaData == null) {
208+
throw new IllegalArgumentException("index [" + index + "] does not exist");
209+
}
210+
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(indexMetaData.getSettings());
211+
String failedStep = LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(indexMetaData.getSettings());
212+
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
213+
&& Strings.isNullOrEmpty(failedStep) == false) {
214+
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
215+
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
216+
} else {
217+
throw new IllegalArgumentException("cannot retry an action for an index ["
218+
+ index + "] that has not encountered an error when running a Lifecycle Policy");
219+
}
220+
}
221+
return newState;
222+
}
223+
202224
private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
203225
LongSupplier nowSupplier) {
204226
long nowAsMillis = nowSupplier.getAsLong();
205227
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
206228
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
207229
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis)
208-
// clear any step info from the current step
230+
// clear any step info or error-related settings from the current step
231+
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, (String) null)
209232
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
210233
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
211234
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);

x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ public ClusterState moveClusterStateToStep(ClusterState currentState, String ind
6767
nowSupplier, policyRegistry);
6868
}
6969

70+
public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
71+
return lifecycleRunner.moveClusterStateToFailedStep(currentState, indices);
72+
}
73+
7074
SchedulerEngine getScheduler() {
7175
return scheduler.get();
7276
}
@@ -75,6 +79,14 @@ SchedulerEngine.Job getScheduledJob() {
7579
return scheduledJob;
7680
}
7781

82+
public LongSupplier getNowSupplier() {
83+
return nowSupplier;
84+
}
85+
86+
public PolicyStepsRegistry getPolicyRegistry() {
87+
return policyRegistry;
88+
}
89+
7890
@Override
7991
public void clusterChanged(ClusterChangedEvent event) {
8092
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*
6+
*/
7+
package org.elasticsearch.xpack.indexlifecycle.action;
8+
9+
import org.elasticsearch.client.node.NodeClient;
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.rest.BaseRestHandler;
13+
import org.elasticsearch.rest.RestController;
14+
import org.elasticsearch.rest.RestRequest;
15+
import org.elasticsearch.rest.action.RestToXContentListener;
16+
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
17+
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
18+
19+
public class RestRetryAction extends BaseRestHandler {
20+
21+
public RestRetryAction(Settings settings, RestController controller) {
22+
super(settings);
23+
controller.registerHandler(RestRequest.Method.POST, IndexLifecycle.BASE_PATH + "_retry/{index}", this);
24+
}
25+
26+
@Override
27+
public String getName() {
28+
return "xpack_lifecycle_retry_action";
29+
}
30+
31+
@Override
32+
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
33+
String[] indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
34+
RetryAction.Request request = new RetryAction.Request(indices);
35+
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
36+
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
37+
return channel -> client.execute(RetryAction.INSTANCE, request, new RestToXContentListener<>(channel));
38+
}
39+
}

0 commit comments

Comments
 (0)