Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*
*/
package org.elasticsearch.xpack.core.indexlifecycle.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.IndicesOptions.Option;
import org.elasticsearch.action.support.IndicesOptions.WildcardStates;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Objects;

public class RetryAction extends Action<RetryAction.Request, RetryAction.Response, RetryAction.RequestBuilder> {
public static final RetryAction INSTANCE = new RetryAction();
public static final String NAME = "indices:admin/xpack/index_lifecycle/_retry/post";

protected RetryAction() {
super(NAME);
}

@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}

@Override
public Response newResponse() {
return new Response();
}

public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {

protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}

}

public static class Response extends AcknowledgedResponse implements ToXContentObject {

public Response() {
}

public Response(boolean acknowledged) {
super(acknowledged);
}

@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}

@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged());
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}

}

public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
private String[] indices;

public Request(String... indices) {
this.indices = indices;
}

public Request() {
}

@Override
public Request indices(String... indices) {
this.indices = indices;
return this;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
// Re-run should only resolve to open concrete indices (not aliases)
return new IndicesOptions(EnumSet.of(Option.IGNORE_ALIASES), EnumSet.of(WildcardStates.OPEN));
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.indices = in.readStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
}

@Override
public int hashCode() {
return Arrays.hashCode(indices);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Arrays.equals(indices, other.indices);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*
*/
package org.elasticsearch.xpack.core.indexlifecycle.action;

import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;

public class ReRunRequestTests extends AbstractStreamableTestCase<Request> {

@Override
protected Request createTestInstance() {
String[] indices = new String[randomIntBetween(1, 10)];
for (int i = 0; i < indices.length; i++) {
indices[i] = randomAlphaOfLengthBetween(2, 5);
}
return new Request(indices);
}

@Override
protected Request createBlankInstance() {
return new Request();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*
*/
package org.elasticsearch.xpack.core.indexlifecycle.action;

import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;

public class ReRunResponseTests extends AbstractStreamableTestCase<Response> {

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Response createBlankInstance() {
return new Response();
}

@Override
protected Response mutateInstance(Response response) {
return new Response(response.isAcknowledged() == false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.MoveToStepAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestMoveToStepAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestRetryAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportMoveToStepAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportPutLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction;

import java.time.Clock;
import java.util.ArrayList;
Expand Down Expand Up @@ -140,7 +143,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestPutLifecycleAction(settings, restController),
new RestGetLifecycleAction(settings, restController),
new RestDeleteLifecycleAction(settings, restController),
new RestMoveToStepAction(settings, restController));
new RestMoveToStepAction(settings, restController),
new RestRetryAction(settings, restController)
);
}

@Override
Expand All @@ -152,7 +157,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(PutLifecycleAction.INSTANCE, TransportPutLifecycleAction.class),
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class));
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class),
new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.io.IOException;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class IndexLifecycleRunner {
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
Expand Down Expand Up @@ -127,7 +128,7 @@ private void executeClusterStateSteps(Index index, String policy, Step step) {
* @param indexSettings
* the index settings to extract the {@link StepKey} from.
*/
static StepKey getCurrentStepKey(Settings indexSettings) {
public static StepKey getCurrentStepKey(Settings indexSettings) {
String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings);
String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings);
String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings);
Expand Down Expand Up @@ -199,13 +200,35 @@ static ClusterState moveClusterStateToErrorStep(Index index, ClusterState cluste
return newClusterStateBuilder.build();
}

ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
ClusterState newState = currentState;
for (String index : indices) {
IndexMetaData indexMetaData = currentState.metaData().index(index);
if (indexMetaData == null) {
throw new IllegalArgumentException("index [" + index + "] does not exist");
}
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(indexMetaData.getSettings());
String failedStep = LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(indexMetaData.getSettings());
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
&& Strings.isNullOrEmpty(failedStep) == false) {
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
} else {
throw new IllegalArgumentException("cannot retry an action for an index ["
+ index + "] that has not encountered an error when running a Lifecycle Policy");
}
}
return newState;
}

private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
LongSupplier nowSupplier) {
long nowAsMillis = nowSupplier.getAsLong();
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis)
// clear any step info from the current step
// clear any step info or error-related settings from the current step
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, (String) null)
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public ClusterState moveClusterStateToStep(ClusterState currentState, String ind
nowSupplier, policyRegistry);
}

public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
return lifecycleRunner.moveClusterStateToFailedStep(currentState, indices);
}

SchedulerEngine getScheduler() {
return scheduler.get();
}
Expand All @@ -75,6 +79,14 @@ SchedulerEngine.Job getScheduledJob() {
return scheduledJob;
}

public LongSupplier getNowSupplier() {
return nowSupplier;
}

public PolicyStepsRegistry getPolicyRegistry() {
return policyRegistry;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*
*/
package org.elasticsearch.xpack.indexlifecycle.action;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;

public class RestRetryAction extends BaseRestHandler {

public RestRetryAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, IndexLifecycle.BASE_PATH + "_retry/{index}", this);
}

@Override
public String getName() {
return "xpack_lifecycle_retry_action";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
String[] indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
RetryAction.Request request = new RetryAction.Request(indices);
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
return channel -> client.execute(RetryAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Loading