diff --git a/README.md b/README.md index 29653150..19a0ed9f 100644 --- a/README.md +++ b/README.md @@ -551,6 +551,48 @@ The preceding example uses a default `RetryTemplate` inside the interceptor. To policies or listeners, you need only inject an instance of `RetryTemplate` into the interceptor. +## Asynchronous retry +### Terms +```java + +CompletableFuture> completableFuture = retryTemplate.execute( + ctx -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + ); +``` +- __async callback__ - a callback that returns one of the supported async types (CompletableFuture, Future). Usually, async retry callback is one, that does not perform a heavy work by itself, but schedules the work to some worker and returns an instance of async type to track the progress. Failure of async callback itself usually means failure of scheduling (but not of actual work). +Failure of async callback (of scheduling) and of actual job will both be retried on a common basis, according to configured policies. + +- __job__ - a task with payload, usually heavy, which result will be available through the instance of async type, returned by async callback (and, consequently, by _execute_ method) + +- __rescheduling executor__ - an instance of executor, used for scheduling a new retry attempt after a delay (provided by a backoff policy). The type of executor is restricted by ScheduledExecutorService, to take advantage of its "schedule after delay" feature, which allows us to implement backoff without blocking a thread. Rescheduling executor is used for all retries except of initial scheduling retries (initial invocation of async callback). + +### Initial invocation of async callback +Invocation of template.execute(asyncCallback) returns when first scheduling of job succeeded, or all initial scheduling attempts failed. Retry template does not produce async containers by itself, therefore there is nothing to return from _execute_ until initial invocation succeed. Backoffs between failing initial scheduling attempts will be performed by default sleeper by means of Thread.sleep() on caller thread. Why this approach is used: +- to be compatible with generic API of RetryOperations (where return type of callback equals to retrun type of execute(...)) +- to provide an additional mean of back pressure + +### Subsequent invocations of async callback +If the first execution of the _job_ failed and a retry is allowed by the policy, the next invocation of the async callback will be scheduled on _rescheduling executor_ + +### Async callbacks without executor +If executor is not provided, a backoff will be performed by Thread.sleep() on the client thread (for initial scheduling) or on the worker thread (for job failures, or for subsequent schedulings). + +### Configuration example +```java +RetryTemplate.builder() + // activte the async retry feature with an executor + .asyncRetry(Executors.newScheduledThreadPool(1)) + .fixedBackoff(1000) + .build(); + +RetryTemplate.builder() + // activte the async retry feature without an executor. + // Thread.sleep() will be used for backoff. + .asyncRetry() + .fixedBackoff(1000) + .build(); +``` + ## Contributing Spring Retry is released under the non-restrictive Apache 2.0 license diff --git a/pom.xml b/pom.xml index a9389a4f..9791a4da 100644 --- a/pom.xml +++ b/pom.xml @@ -320,8 +320,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + 1.8 + 1.8 diff --git a/src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java b/src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java new file mode 100644 index 00000000..bd97cc3e --- /dev/null +++ b/src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java @@ -0,0 +1,7 @@ +package org.springframework.retry.backoff; + +import java.util.function.Supplier; + +public interface BackoffPeriodSupplier extends Supplier { + +} diff --git a/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java new file mode 100644 index 00000000..10a7feae --- /dev/null +++ b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java @@ -0,0 +1,23 @@ +package org.springframework.retry.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class RememberPeriodSleeper implements Sleeper, BackoffPeriodSupplier { + + private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class); + + private volatile Long lastBackoffPeriod; + + @Override + public void sleep(long backOffPeriod) { + logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod); + lastBackoffPeriod = backOffPeriod; + } + + @Override + public Long get() { + return lastBackoffPeriod; + } + +} diff --git a/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java b/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java index f769742a..4a540ced 100644 --- a/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java +++ b/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java @@ -25,7 +25,6 @@ import org.springframework.classify.Classifier; import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryOperations; import org.springframework.retry.RetryState; diff --git a/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java new file mode 100644 index 00000000..c9b7e60a --- /dev/null +++ b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryException; +import org.springframework.retry.backoff.BackoffPeriodSupplier; + +/** + * @author Dave Syer + * @param The result type + */ +public abstract class AsyncRetryResultProcessor implements RetryResultProcessor { + + private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class); + + protected T doNewAttempt(Supplier> supplier) throws Throwable { + logger.debug("Performing the next async callback invocation..."); + return supplier.get().getOrThrow(); + } + + protected abstract T scheduleNewAttemptAfterDelay(Supplier> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) + throws Throwable; + + protected T handleException(Supplier> supplier, Consumer handler, Throwable throwable, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + try { + handler.accept(unwrapIfNeed(throwable)); + + if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) { + return doNewAttempt(supplier); + } + else { + long rescheduleAfterMillis = lastBackoffPeriodSupplier.get(); + logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis..."); + return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx); + } + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(unwrapIfNeed(t)); + } + } + + static Throwable unwrapIfNeed(Throwable throwable) { + if (throwable instanceof ExecutionException || throwable instanceof CompletionException + || throwable instanceof RetryException) { + return throwable.getCause(); + } + else { + return throwable; + } + } + +} diff --git a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java new file mode 100644 index 00000000..b5658bc0 --- /dev/null +++ b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java @@ -0,0 +1,80 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackoffPeriodSupplier; + +/** + * A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a + * {@link RetryCallback} returns a CompletableFuture this processor can be + * used internally by the {@link RetryTemplate} to wrap it and process the result. + * + * @author Dave Syer + * @param The result type + */ +public class CompletableFutureRetryResultProcessor extends AsyncRetryResultProcessor> { + + protected final Log logger = LogFactory.getLog(getClass()); + + @Override + public Result> process(CompletableFuture completable, + Supplier>> supplier, Consumer handler, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + + CompletableFuture handle = completable + .thenApply(CompletableFuture::completedFuture).exceptionally(throwable -> handleException(supplier, + handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)) + .thenCompose(Function.identity()); + + return new Result<>(handle); + } + + protected CompletableFuture scheduleNewAttemptAfterDelay(Supplier>> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) { + CompletableFuture> futureOfFurtherScheduling = new CompletableFuture<>(); + + reschedulingExecutor.schedule(() -> { + try { + RetrySynchronizationManager.register(ctx); + futureOfFurtherScheduling.complete(doNewAttempt(supplier)); + } + catch (Throwable t) { + futureOfFurtherScheduling.completeExceptionally(t); + throw RetryTemplate.runtimeException(t); + } + finally { + RetrySynchronizationManager.clear(); + } + }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); + + return futureOfFurtherScheduling.thenCompose(Function.identity()); + } + +} \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java new file mode 100644 index 00000000..5c4f52be --- /dev/null +++ b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java @@ -0,0 +1,193 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackoffPeriodSupplier; + +/** + * todo: check or remove after discussion + * + * A {@link RetryResultProcessor} for a plain {@link Future}. If a {@link RetryCallback} + * returns a Future this processor can be used internally by the + * {@link RetryTemplate} to wrap it and process the result. + * + * @author Dave Syer + * @param The result type + */ +public class FutureRetryResultProcessor extends AsyncRetryResultProcessor> { + + @Override + public Result> process(Future future, Supplier>> supplier, + Consumer handler, ScheduledExecutorService reschedulingExecutor, + BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) { + return new Result<>(new FutureWrapper(future, supplier, handler, this, reschedulingExecutor, + lastBackoffPeriodSupplier, ctx)); + } + + @Override + protected Future scheduleNewAttemptAfterDelay(Supplier>> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) + throws Throwable { + ScheduledFuture> scheduledFuture = reschedulingExecutor.schedule(() -> { + try { + return doNewAttempt(supplier); + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(t); + } + }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); + + return new FutureFlatter(scheduledFuture); + } + + private class FutureWrapper implements Future { + + private Future delegate; + + private Supplier>> supplier; + + private Consumer handler; + + private AsyncRetryResultProcessor> processor; + + private final ScheduledExecutorService reschedulingExecutor; + + private final BackoffPeriodSupplier lastBackoffPeriodSupplier; + + private RetryContext ctx; + + FutureWrapper(Future delegate, Supplier>> supplier, Consumer handler, + AsyncRetryResultProcessor> processor, ScheduledExecutorService reschedulingExecutor, + BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) { + this.delegate = delegate; + this.supplier = supplier; + this.handler = handler; + this.processor = processor; + this.reschedulingExecutor = reschedulingExecutor; + this.lastBackoffPeriodSupplier = lastBackoffPeriodSupplier; + this.ctx = ctx; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return this.delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return this.delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return this.delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + try { + return this.delegate.get(); + } + catch (Throwable e) { + return processor + .handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + .get(); + } + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + try { + return this.delegate.get(timeout, unit); + } + catch (Throwable e) { + return processor + .handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + .get(timeout, unit); + } + } + + } + + private class FutureFlatter implements Future { + + private Future> nestedFuture; + + FutureFlatter(Future> nestedFuture) { + this.nestedFuture = nestedFuture; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + try { + if (this.nestedFuture.isDone()) { + return this.nestedFuture.get().cancel(mayInterruptIfRunning); + } + else { + return this.nestedFuture.cancel(mayInterruptIfRunning); + } + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(t); + } + } + + @Override + public boolean isCancelled() { + try { + return this.nestedFuture.isCancelled() + || (this.nestedFuture.isDone() && this.nestedFuture.get().isCancelled()); + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(t); + } + } + + @Override + public boolean isDone() { + try { + return this.nestedFuture.isDone() && this.nestedFuture.get().isDone(); + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(t); + } + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return this.nestedFuture.get().get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.nestedFuture.get(timeout, unit).get(timeout, unit); + } + + } + +} \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java new file mode 100644 index 00000000..eb4e44b0 --- /dev/null +++ b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackoffPeriodSupplier; + +/** + * @author Dave Syer + * @param the type of result from the retryable operation + */ +public interface RetryResultProcessor { + + Result process(T input, Supplier> supplier, Consumer handler, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx); + + public static class Result { + + public Throwable exception; + + private T result; + + private boolean complete; + + public Result(Throwable exception) { + this.exception = exception; + this.complete = false; + } + + public Result(T result) { + this.result = result; + this.complete = true; + } + + boolean isComplete() { + return this.complete; + } + + public Throwable getException() { + return exception; + } + + public T getResult() { + return result; + } + + public T getOrThrow() throws Throwable { + if (isComplete()) { + return result; + } + throw exception; + } + + } + +} diff --git a/src/main/java/org/springframework/retry/support/RetryTemplate.java b/src/main/java/org/springframework/retry/support/RetryTemplate.java index cdf6908b..e3f206fa 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplate.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplate.java @@ -19,10 +19,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.classify.Classifier; import org.springframework.retry.ExhaustedRetryException; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; @@ -36,10 +38,14 @@ import org.springframework.retry.backoff.BackOffContext; import org.springframework.retry.backoff.BackOffInterruptedException; import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.BackoffPeriodSupplier; import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.backoff.RememberPeriodSleeper; +import org.springframework.retry.backoff.SleepingBackOffPolicy; import org.springframework.retry.policy.MapRetryContextCache; import org.springframework.retry.policy.RetryContextCache; import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryResultProcessor.Result; /** * Template class that simplifies the execution of operations with retry semantics. @@ -88,13 +94,19 @@ public class RetryTemplate implements RetryOperations { private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy(); + private volatile BackoffPeriodSupplier lastBackoffPeriodSupplier = null; + private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3); private volatile RetryListener[] listeners = new RetryListener[0]; - private RetryContextCache retryContextCache = new MapRetryContextCache(); + private volatile RetryContextCache retryContextCache = new MapRetryContextCache(); + + private volatile Classifier> processors = null; + + private volatile ScheduledExecutorService reschedulingExecutor = null; - private boolean throwLastExceptionOnExhausted; + private volatile boolean throwLastExceptionOnExhausted; /** * Main entry point to configure RetryTemplate using fluent API. See @@ -132,6 +144,15 @@ public void setRetryContextCache(RetryContextCache retryContextCache) { this.retryContextCache = retryContextCache; } + /** + * Public setter for the retry result processors (if any). Default null (same as + * empty). + * @param processors the processors to set + */ + public void setRetryResultProcessors(Classifier> processors) { + this.processors = processors; + } + /** * Setter for listeners. The listeners are executed before and after a retry block * (i.e. before and after all the attempts), and on an error (every attempt). @@ -142,6 +163,11 @@ public void setListeners(RetryListener[] listeners) { this.listeners = Arrays.asList(listeners).toArray(new RetryListener[listeners.length]); } + public void setReschedulingExecutor(ScheduledExecutorService reschedulingExecutor) { + this.reschedulingExecutor = reschedulingExecutor; + this.backOffPolicy = replaceSleeperIfNeed(backOffPolicy); + } + /** * Register an additional listener. * @param listener the {@link RetryListener} @@ -158,7 +184,20 @@ public void registerListener(RetryListener listener) { * @param backOffPolicy the {@link BackOffPolicy} */ public void setBackOffPolicy(BackOffPolicy backOffPolicy) { - this.backOffPolicy = backOffPolicy; + this.backOffPolicy = replaceSleeperIfNeed(backOffPolicy); + } + + private BackOffPolicy replaceSleeperIfNeed(BackOffPolicy backOffPolicy) { + if (reschedulingExecutor != null && backOffPolicy instanceof SleepingBackOffPolicy) { + this.logger + .debug("Replacing the default sleeper by RememberPeriodSleeper to enable scheduler-based backoff."); + RememberPeriodSleeper rememberPeriodSleeper = new RememberPeriodSleeper(); + lastBackoffPeriodSupplier = rememberPeriodSleeper; + return ((SleepingBackOffPolicy) backOffPolicy).withSleeper(rememberPeriodSleeper); + } + else { + return backOffPolicy; + } } /** @@ -286,89 +325,134 @@ protected T doExecute(RetryCallback retryCallback } } - /* - * We allow the whole loop to be skipped if the policy or context already - * forbid the first try. This is used in the case of external retry to allow a - * recovery in handleRetryExhausted without the callback processing (which - * would throw an exception). - */ - while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { + Result result = loop(retryCallback, state, context, backOffContext); + if (result.isComplete()) { + return result.getResult(); + } + lastException = result.exception; + if (state == null && this.logger.isDebugEnabled()) { + this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount()); + } - try { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Retry: count=" + context.getRetryCount()); - } - // Reset the last exception, so if we are successful - // the close interceptors will not think we failed... - lastException = null; - return retryCallback.doWithRetry(context); - } - catch (Throwable e) { + exhausted = true; + return handleRetryExhausted(recoveryCallback, context, state); - lastException = e; + } + catch (Throwable e) { + lastException = e; + throw RetryTemplate.wrapIfNecessary(e); + } + finally { + close(retryPolicy, context, state, lastException == null || exhausted); + doCloseInterceptors(retryCallback, context, lastException); + RetrySynchronizationManager.clear(); + } - try { - registerThrowable(retryPolicy, state, context, e); - } - catch (Exception ex) { - throw new TerminatedRetryException("Could not register throwable", ex); - } - finally { - doOnErrorInterceptors(retryCallback, context, e); - } + } - if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { - try { - backOffPolicy.backOff(backOffContext); - } - catch (BackOffInterruptedException ex) { - lastException = e; - // back off was prevented by another thread - fail the retry - if (this.logger.isDebugEnabled()) { - this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()); - } - throw ex; - } - } + private Result safeLoop(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext) { + try { + return loop(retryCallback, state, context, backOffContext); + } + catch (Throwable ex) { + throw runtimeException(ex); + } + } - if (this.logger.isDebugEnabled()) { - this.logger.debug("Checking for rethrow: count=" + context.getRetryCount()); - } + private Result loop(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext) throws E { - if (shouldRethrow(retryPolicy, context, state)) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); - } - throw RetryTemplate.wrapIfNecessary(e); - } + Throwable lastException = null; - } + /* + * We allow the whole loop to be skipped if the policy or context already forbid + * the first try. This is used in the case of external retry to allow a recovery + * in handleRetryExhausted without the callback processing (which would throw an + * exception). + */ + while (canRetry(this.retryPolicy, context) && !context.isExhaustedOnly()) { + + try { - /* - * A stateful attempt that can retry may rethrow the exception before now, - * but if we get this far in a stateful retry there's a reason for it, - * like a circuit breaker or a rollback classifier. - */ - if (state != null && context.hasAttribute(GLOBAL_STATE)) { - break; + if (this.logger.isDebugEnabled()) { + this.logger.debug("Retry: count=" + context.getRetryCount()); } + T result = retryCallback.doWithRetry(context); + if (result != null && this.processors != null) { + @SuppressWarnings("unchecked") + RetryResultProcessor processor = (RetryResultProcessor) this.processors.classify(result); + if (processor != null) { + return processor.process(result, () -> safeLoop(retryCallback, state, context, backOffContext), + error -> safeHandleLoopException(retryCallback, state, context, backOffContext, error), + reschedulingExecutor, lastBackoffPeriodSupplier, context); + } + } + return new Result<>(result); + } + catch (Throwable e) { + + lastException = e; + handleLoopException(retryCallback, state, context, backOffContext, e); - if (state == null && this.logger.isDebugEnabled()) { - this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount()); } + /* + * A stateful attempt that can retry may rethrow the exception before now, but + * if we get this far in a stateful retry there's a reason for it, like a + * circuit breaker or a rollback classifier. + */ + if (state != null && context.hasAttribute(GLOBAL_STATE)) { + break; + } + } + return new Result<>(lastException == null ? context.getLastThrowable() : lastException); + } - exhausted = true; - return handleRetryExhausted(recoveryCallback, context, state); + private void safeHandleLoopException(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext, Throwable e) { + try { + handleLoopException(retryCallback, state, context, backOffContext, e); + } + catch (Throwable ex) { + throw runtimeException(ex); + } + } + private void handleLoopException(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext, Throwable e) throws E { + try { + registerThrowable(this.retryPolicy, state, context, e); } - catch (Throwable e) { - throw RetryTemplate.wrapIfNecessary(e); + catch (Exception ex) { + throw new TerminatedRetryException("Could not register throwable", ex); } finally { - close(retryPolicy, context, state, lastException == null || exhausted); - doCloseInterceptors(retryCallback, context, lastException); - RetrySynchronizationManager.clear(); + doOnErrorInterceptors(retryCallback, context, e); + } + + if (canRetry(this.retryPolicy, context) && !context.isExhaustedOnly()) { + try { + this.backOffPolicy.backOff(backOffContext); + } + catch (BackOffInterruptedException ex) { + // back off was prevented by another thread - fail the retry + if (this.logger.isDebugEnabled()) { + this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()); + } + throw ex; + } + } + + if (this.logger.isDebugEnabled()) { + this.logger.debug("Checking for rethrow: count=" + context.getRetryCount()); + } + + if (shouldRethrow(this.retryPolicy, context, state)) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); + } + throw RetryTemplate.wrapIfNecessary(e); } } @@ -414,6 +498,7 @@ protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, Retr registerContext(context, state); } + // есть стейт, есть ключ, сохраняем данный контекст в кэш: ключ -> контекст private void registerContext(RetryContext context, RetryState state) { if (state != null) { Object key = state.getKey(); @@ -590,4 +675,24 @@ else if (throwable instanceof Exception) { } } + /** + * Re-throws the original throwable if it is an RuntimeException, and wraps + * non-exceptions into {@link RetryException}. + * @param throwable the input errror + * @return a RuntimeException if possible + * @throws RetryException if the throwable is checked + */ + public static RuntimeException runtimeException(Throwable throwable) throws RetryException { + if (throwable instanceof Error) { + throw (Error) throwable; + } + else if (throwable instanceof RuntimeException) { + RuntimeException rethrow = (RuntimeException) throwable; + return rethrow; + } + else { + throw new RetryException("Exception in retry", throwable); + } + } + } diff --git a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java index dd5bdbad..10e40fbc 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java @@ -16,10 +16,17 @@ package org.springframework.retry.support; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; +import org.springframework.classify.SubclassClassifier; import org.springframework.retry.RetryListener; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.BackOffPolicy; @@ -27,6 +34,7 @@ import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.backoff.SleepingBackOffPolicy; import org.springframework.retry.backoff.UniformRandomBackOffPolicy; import org.springframework.retry.policy.AlwaysRetryPolicy; import org.springframework.retry.policy.BinaryExceptionClassifierRetryPolicy; @@ -94,6 +102,10 @@ public class RetryTemplateBuilder { private BinaryExceptionClassifierBuilder classifierBuilder; + private ScheduledExecutorService executorService; + + private Map, RetryResultProcessor> processors = new HashMap<>(); + /* ---------------- Configure retry policy -------------- */ /** @@ -356,6 +368,28 @@ public RetryTemplateBuilder withListeners(List listeners) { return this; } + /* ---------------- Async -------------- */ + + public RetryTemplateBuilder asyncRetry(ScheduledExecutorService reschedulingExecutor) { + this.executorService = reschedulingExecutor; + return asyncRetry(); + } + + /** + * Enable async retry feature. Due to no rescheduling executor is provided, a + * potential backoff will be performed by Thread.sleep(). + * @return A new RetryTemplateBuilder for an async retry + */ + public RetryTemplateBuilder asyncRetry() { + // todo: support interface classification (does not work yet) + this.processors.put(Future.class, new FutureRetryResultProcessor<>()); + this.processors.put(FutureTask.class, new FutureRetryResultProcessor<>()); + this.processors.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); + + // todo + return this; + } + /* ---------------- Building -------------- */ /** @@ -399,6 +433,17 @@ public RetryTemplate build() { retryTemplate.setListeners(this.listeners.toArray(new RetryListener[0])); } + // Scheduler + if (this.executorService != null) { + retryTemplate.setReschedulingExecutor(executorService); + + Assert.isTrue(backOffPolicy instanceof SleepingBackOffPolicy, + "Usage of a rescheduling executor makes sense " + "only with an instance of SleepingBackOffPolicy"); + } + + SubclassClassifier> classifier = new SubclassClassifier<>(processors, null); + retryTemplate.setRetryResultProcessors(classifier); + return retryTemplate; } diff --git a/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java b/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java index 8f36e62d..13bf65bf 100644 --- a/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java +++ b/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java @@ -51,7 +51,7 @@ protected void doClose(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); support.close(context, callback, null); assertEquals(1, callsOnDoCloseMethod.get()); @@ -68,7 +68,7 @@ protected void doClose(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - RetryCallback callback = mock(RetryCallback.class); + RetryCallback callback = mock(RetryCallback.class); support.close(context, callback, null); assertEquals(0, callsOnDoCloseMethod.get()); @@ -96,7 +96,7 @@ protected void doOnError(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); support.onError(context, callback, null); assertEquals(1, callsOnDoOnErrorMethod.get()); @@ -120,7 +120,7 @@ protected boolean doOpen(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); assertTrue(support.open(context, callback)); assertEquals(1, callsOnDoOpenMethod.get()); diff --git a/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java new file mode 100644 index 00000000..5f3c5a51 --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackOffContext; +import org.springframework.retry.backoff.BackOffInterruptedException; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.RememberPeriodSleeper; +import org.springframework.retry.backoff.Sleeper; + +import static org.junit.Assert.assertTrue; +import static org.springframework.retry.util.test.TestUtils.getPropertyValue; + +/** + * @author Dave Syer + */ +public class AbstractAsyncRetryTest { + + /* + * ---------------- Async callbacks implementations for different types -------------- + */ + + static class CompletableFutureRetryCallback extends AbstractRetryCallback> { + + @Override + public CompletableFuture schedule(Supplier callback, ExecutorService workerExecutor) { + return CompletableFuture.supplyAsync(callback, workerExecutor); + } + + @Override + Object awaitItself(CompletableFuture asyncType) { + return asyncType.join(); + } + + } + + static class FutureRetryCallback extends AbstractRetryCallback> { + + @Override + public Future schedule(Supplier callback, ExecutorService executor) { + return executor.submit(callback::get); + } + + @Override + Object awaitItself(Future asyncType) throws Throwable { + return asyncType.get(); + } + + } + + static abstract class AbstractRetryCallback implements RetryCallback { + + final Object defaultResult = new Object(); + + final Log logger = LogFactory.getLog(getClass()); + + final AtomicInteger jobAttempts = new AtomicInteger(); + + final AtomicInteger schedulingAttempts = new AtomicInteger(); + + volatile int attemptsBeforeSchedulingSuccess; + + volatile int attemptsBeforeJobSuccess; + + volatile RuntimeException exceptionToThrow = new RuntimeException(); + + volatile Function resultSupplier = ctx -> defaultResult; + + volatile Consumer customCodeBeforeScheduling = ctx -> { + }; + + final List schedulerThreadNames = new CopyOnWriteArrayList<>(); + + final List invocationMoments = new CopyOnWriteArrayList<>(); + + final ExecutorService workerExecutor = Executors + .newSingleThreadExecutor(getNamedThreadFactory(WORKER_THREAD_NAME)); + + public abstract A schedule(Supplier callback, ExecutorService executor); + + abstract Object awaitItself(A asyncType) throws Throwable; + + @Override + public A doWithRetry(RetryContext ctx) throws Exception { + rememberThreadName(); + rememberInvocationMoment(); + + throwIfSchedulingTooEarly(); + + customCodeBeforeScheduling.accept(ctx); + + return schedule(() -> { + try { + // a hack to avoid running CompletableFuture#thenApplyAsync in the + // caller thread + Thread.sleep(100L); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + throwIfJobTooEarly(); + logger.debug("Succeeding the callback..."); + return resultSupplier.apply(ctx); + }, workerExecutor); + } + + void rememberInvocationMoment() { + invocationMoments.add(System.currentTimeMillis()); + } + + void rememberThreadName() { + schedulerThreadNames.add(Thread.currentThread().getName()); + } + + void throwIfJobTooEarly() { + if (this.jobAttempts.incrementAndGet() < this.attemptsBeforeJobSuccess) { + logger.debug("Failing job..."); + throw this.exceptionToThrow; + } + } + + void throwIfSchedulingTooEarly() { + if (this.schedulingAttempts.incrementAndGet() < this.attemptsBeforeSchedulingSuccess) { + logger.debug("Failing scheduling..."); + throw this.exceptionToThrow; + } + } + + void setAttemptsBeforeJobSuccess(int attemptsBeforeJobSuccess) { + this.attemptsBeforeJobSuccess = attemptsBeforeJobSuccess; + } + + void setAttemptsBeforeSchedulingSuccess(int attemptsBeforeSchedulingSuccess) { + this.attemptsBeforeSchedulingSuccess = attemptsBeforeSchedulingSuccess; + } + + void setExceptionToThrow(RuntimeException exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + void setResultSupplier(Function resultSupplier) { + this.resultSupplier = resultSupplier; + } + + void setCustomCodeBeforeScheduling(Consumer customCodeBeforeScheduling) { + this.customCodeBeforeScheduling = customCodeBeforeScheduling; + } + + } + + static class MockBackOffStrategy implements BackOffPolicy { + + public int backOffCalls; + + public int startCalls; + + @Override + public BackOffContext start(RetryContext status) { + if (!status.hasAttribute(MockBackOffStrategy.class.getName())) { + this.startCalls++; + status.setAttribute(MockBackOffStrategy.class.getName(), true); + } + return null; + } + + @Override + public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException { + this.backOffCalls++; + } + + } + + /* ---------------- Utilities -------------- */ + + static final String SCHEDULER_THREAD_NAME = "scheduler"; + static final String WORKER_THREAD_NAME = "worker"; + + static ScheduledExecutorService getNamedScheduledExecutor() { + return Executors.newScheduledThreadPool(1, getNamedThreadFactory(AbstractAsyncRetryTest.SCHEDULER_THREAD_NAME)); + } + + static ThreadFactory getNamedThreadFactory(String threadName) { + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName(threadName); + return thread; + } + }; + } + + void assertRememberingSleeper(RetryTemplate template) { + // The sleeper of the backoff policy should be an instance of + // RememberPeriodSleeper, means not Thread.sleep() + BackOffPolicy backOffPolicy = getPropertyValue(template, "backOffPolicy", BackOffPolicy.class); + Sleeper sleeper = getPropertyValue(backOffPolicy, "sleeper", Sleeper.class); + assertTrue(sleeper instanceof RememberPeriodSleeper); + } + +} diff --git a/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java new file mode 100644 index 00000000..ff77f171 --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java @@ -0,0 +1,382 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class AsyncReschedulingTests extends AbstractAsyncRetryTest { + + /* + * Scheduling retry + job immediate success. + * + * - async callback succeeds at 3rd attempt - actual job succeeds on 1st attempt - no + * backoff + */ + @Test + public void testInitialSchedulingEventualSuccessCF() throws Throwable { + doTestInitialSchedulingEventualSuccess(new CompletableFutureRetryCallback()); + } + + @Test + public void testInitialSchedulingEventualSuccessF() throws Throwable { + doTestInitialSchedulingEventualSuccess(new FutureRetryCallback()); + } + + private void doTestInitialSchedulingEventualSuccess(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder().maxAttempts(5).noBackoff().asyncRetry().build(); + + callback.setAttemptsBeforeSchedulingSuccess(3); + callback.setAttemptsBeforeJobSuccess(1); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + + // All invocations before first successful scheduling should be performed by the + // caller thread + assertEquals(Collections.nCopies(3, Thread.currentThread().getName()), callback.schedulerThreadNames); + + assertEquals(1, callback.jobAttempts.get()); + } + + /* + * Immediate success of both scheduling and job. + * + * - async callback, that does not fail itself - actual job succeeds on 1st attempt - + * backoff is not necessary + */ + @Test + public void testImmediateSuccessCF() throws Throwable { + doTestImmediateSuccess(new CompletableFutureRetryCallback()); + } + + @Test + public void testImmediateSuccessF() throws Throwable { + doTestImmediateSuccess(new FutureRetryCallback()); + } + + private void doTestImmediateSuccess(AbstractRetryCallback callback) throws Throwable { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + RetryTemplate template = RetryTemplate.builder().fixedBackoff(10000).asyncRetry(executor).build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(1); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + + // Single invocation should be performed by the caller thread + assertEquals(Collections.singletonList(Thread.currentThread().getName()), callback.schedulerThreadNames); + + assertEquals(1, callback.jobAttempts.get()); + + // No interaction with the rescheduling executor should be performed if the first + // execution of the job succeeds. + verifyZeroInteractions(executor); + } + + /* + * Async retry with rescheduler. + * + * - async callback, that does not fail itself - actual job succeeds on 3rd attempt - + * backoff is performed using executor, without Thread.sleep() + */ + @Test + public void testAsyncRetryWithReschedulerCF() throws Throwable { + doTestAsyncRetryWithRescheduler(new CompletableFutureRetryCallback()); + } + + @Test + public void testAsyncRetryWithReschedulerF() throws Throwable { + doTestAsyncRetryWithRescheduler(new FutureRetryCallback()); + } + + private void doTestAsyncRetryWithRescheduler(AbstractRetryCallback callback) throws Throwable { + + int targetFixedBackoff = 150; + + ScheduledExecutorService executor = getNamedScheduledExecutor(); + + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).fixedBackoff(targetFixedBackoff) + .asyncRetry(executor).build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(3); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by + // the the rescheduler thread + assertEquals(Arrays.asList(Thread.currentThread().getName(), SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), + callback.schedulerThreadNames); + + assertRememberingSleeper(template); + + // Expected backoff should be performed + List moments = callback.invocationMoments; + for (int i = 0; i < moments.size() - 1; i++) { + long approxBackoff = moments.get(i + 1) - moments.get(i); + assertTrue(approxBackoff > targetFixedBackoff); + } + } + + /* + * Async retry without backoff + * + * - async callback succeeds on 2nd attempt - actual job succeeds on 3nd attempt - + * default zero backoff is used (which has no sleeper at all), and therefore + * rescheduler executor is not used at all + */ + @Test + public void testAsyncRetryWithoutBackoffCF() throws Throwable { + doTestAsyncRetryWithoutBackoff(new CompletableFutureRetryCallback()); + } + + // todo: problem: a Future can start retrying only when user calls get(). Consider to + // not support Future at all. + /* + * @Test public void testAsyncRetryWithoutBackoffF() throws Throwable { + * doTestAsyncRetryWithoutBackoff(new FutureRetryCallback()); } + */ + + private void doTestAsyncRetryWithoutBackoff(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).asyncRetry().build(); + + callback.setAttemptsBeforeSchedulingSuccess(2); + callback.setAttemptsBeforeJobSuccess(3); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + assertEquals(4, callback.schedulingAttempts.get()); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by + // the + // the worker thread (because not backoff and no rescheduler thread) + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName(), + WORKER_THREAD_NAME, WORKER_THREAD_NAME), callback.schedulerThreadNames); + } + + /* + * Exhausted on scheduling retries + */ + @Test + public void testExhaustOnSchedulingCF() throws Throwable { + doTestExhaustOnScheduling(new CompletableFutureRetryCallback()); + } + + @Test + public void testExhaustOnSchedulingF() throws Throwable { + doTestExhaustOnScheduling(new FutureRetryCallback()); + } + + private void doTestExhaustOnScheduling(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder().maxAttempts(2).asyncRetry().fixedBackoff(100).build(); + + callback.setAttemptsBeforeSchedulingSuccess(5); + callback.setAttemptsBeforeJobSuccess(5); + + try { + callback.awaitItself(template.execute(callback)); + fail("An exception should be thrown above"); + } + catch (Exception e) { + assertSame(e, callback.exceptionToThrow); + } + + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName()), + callback.schedulerThreadNames); + } + + /* + * Exhausted on job retries + */ + @Test + public void testExhaustOnJobWithReschedulerCF() throws Throwable { + doTestExhaustOnJobWithRescheduler(new CompletableFutureRetryCallback()); + } + + @Test + public void testExhaustOnJobWithReschedulerF() throws Throwable { + doTestExhaustOnJobWithRescheduler(new FutureRetryCallback()); + } + + private void doTestExhaustOnJobWithRescheduler(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder().maxAttempts(5).asyncRetry(getNamedScheduledExecutor()) + .exponentialBackoff(10, 2, 100).build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(6); + + try { + @SuppressWarnings("unused") + Object v = callback.awaitItself(template.execute(callback)); + fail("An exception should be thrown above"); + // Single wrapping by CompletionException is expected by CompletableFuture + // contract + } + catch (Exception ce) { + assertSame(ce.getCause(), callback.exceptionToThrow); + } + + assertEquals(Arrays.asList(Thread.currentThread().getName(), SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), callback.schedulerThreadNames); + } + + // todo: rejected execution + // todo: interrupt executor + // rethrow not too late + + /* + * Nested rescheduling + */ + + @Test + public void testNested() throws Throwable { + ScheduledExecutorService executor = getNamedScheduledExecutor(); + + RetryTemplate outerTemplate = RetryTemplate.builder().infiniteRetry().asyncRetry(executor).fixedBackoff(10) + .build(); + + RetryTemplate innerTemplate = RetryTemplate.builder().infiniteRetry().asyncRetry(executor).fixedBackoff(10) + .build(); + + CompletableFutureRetryCallback innerCallback = new CompletableFutureRetryCallback(); + innerCallback.setAttemptsBeforeSchedulingSuccess(3); + innerCallback.setAttemptsBeforeJobSuccess(3); + innerCallback.setCustomCodeBeforeScheduling(ctx -> { + // The current context should be available via RetrySynchronizationManager + // while scheduling + // (withing user's async callback itself) + assertEquals(ctx, RetrySynchronizationManager.getContext()); + + // We have no control over user's worker thread, so we can not implicitly + // set/get the parent + // context via RetrySynchronizationManager. + assertNull(ctx.getParent()); + }); + innerCallback.setResultSupplier(ctx -> { + // There is no way to implicitly pass the context into the worker thread, + // because the worker executor, + // thread and callback are fully controlled by the user. The retry engine + // deals with only + // scheduling/rescheduling and their result (e.g. CompletableFuture) + assertNull(RetrySynchronizationManager.getContext()); + + return innerCallback.defaultResult; + }); + + CompletableFutureRetryCallback outerCallback = new CompletableFutureRetryCallback(); + outerCallback.setAttemptsBeforeSchedulingSuccess(3); + outerCallback.setAttemptsBeforeJobSuccess(3); + outerCallback.setCustomCodeBeforeScheduling(ctx -> { + // The current context should be available via RetrySynchronizationManager + // while scheduling + // (withing user's async callback itself) + assertEquals(ctx, RetrySynchronizationManager.getContext()); + }); + outerCallback.setResultSupplier(ctx -> { + try { + assertNull(RetrySynchronizationManager.getContext()); + CompletableFuture innerResultFuture = innerTemplate.execute(innerCallback); + assertNull(RetrySynchronizationManager.getContext()); + + Object innerResult = innerCallback.awaitItself(innerResultFuture); + assertNull(RetrySynchronizationManager.getContext()); + + // Return inner result as outer result + return innerResult; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Object outerResult = outerCallback.awaitItself(outerTemplate.execute(outerCallback)); + assertEquals(innerCallback.defaultResult, outerResult); + + assertEquals(Arrays.asList( + // initial scheduling of the outer callback + Thread.currentThread().getName(), Thread.currentThread().getName(), Thread.currentThread().getName(), + // rescheduling of the outer callback + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), outerCallback.schedulerThreadNames); + + assertEquals(Arrays.asList( + // initial scheduling of the inner callback + WORKER_THREAD_NAME, WORKER_THREAD_NAME, WORKER_THREAD_NAME, + // rescheduling of the inner callback + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), innerCallback.schedulerThreadNames); + } + + /* + * Test with additional chained completable futures. + */ + @Test + public void testAdditionalChainedCF() throws Throwable { + + Object additionalInnerResult = new Object(); + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback() { + @Override + public CompletableFuture schedule(Supplier callback, ExecutorService workerExecutor) { + return super.schedule(callback, workerExecutor) + // Additional inner cf + .thenApply(r -> { + assertEquals(this.defaultResult, r); + return additionalInnerResult; + }); + } + }; + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).asyncRetry().build(); + + callback.setAttemptsBeforeSchedulingSuccess(2); + callback.setAttemptsBeforeJobSuccess(3); + + Object additionalOuterResult = new Object(); + CompletableFuture cf = template.execute(callback) + // Additional step + .thenApply(r -> { + assertEquals(additionalInnerResult, r); + return additionalOuterResult; + }); + + assertEquals(additionalOuterResult, callback.awaitItself(cf)); + assertEquals(4, callback.schedulingAttempts.get()); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by + // the + // the worker thread (because not backoff and no rescheduler thread) + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName(), + WORKER_THREAD_NAME, WORKER_THREAD_NAME), callback.schedulerThreadNames); + } + + // todo: test stateful rescheduling + // todo: test RejectedExecutionException on rescheduler + // todo: test InterruptedException + // todo: support declarative async + +} diff --git a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java new file mode 100644 index 00000000..4a8fc057 --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java @@ -0,0 +1,130 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.classify.SubclassClassifier; +import org.springframework.retry.policy.SimpleRetryPolicy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Dave Syer + */ +public class AsyncRetryTemplateTests extends AbstractAsyncRetryTest { + + private RetryTemplate retryTemplate; + + @Before + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void init() { + // org.apache.log4j.BasicConfigurator.configure(); + + Logger root = Logger.getRootLogger(); + root.removeAllAppenders(); + root.addAppender(new ConsoleAppender(new PatternLayout("%r [%t] %p %c{1} %x - %m%n"))); + Logger.getRootLogger().setLevel(Level.TRACE); + + this.retryTemplate = new RetryTemplate(); + Map, RetryResultProcessor> map = new HashMap<>(); + map.put(Future.class, new FutureRetryResultProcessor()); + map.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); + SubclassClassifier processors = new SubclassClassifier(map, (RetryResultProcessor) null); + this.retryTemplate.setRetryResultProcessors(processors); + } + + @Test + public void testSuccessfulRetryCompletable() throws Throwable { + for (int x = 1; x <= 10; x++) { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(x); + this.retryTemplate.setRetryPolicy(policy); + CompletableFuture result = this.retryTemplate.execute(callback); + assertEquals(callback.defaultResult, result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.jobAttempts.get()); + } + } + + // todo: remove of fix after discussion + /* + * @Test public void testSuccessfulRetryFuture() throws Throwable { for (int x = 1; x + * <= 10; x++) { FutureRetryCallback callback = new FutureRetryCallback(); + * callback.setAttemptsBeforeSchedulingSuccess(1); + * callback.setAttemptsBeforeJobSuccess(x); SimpleRetryPolicy policy = new + * SimpleRetryPolicy(x + 1); this.retryTemplate.setRetryPolicy(policy); Future + * result = this.retryTemplate.execute(callback); assertEquals(callback.defaultResult, + * result.get(10000L, TimeUnit.MILLISECONDS)); assertEquals(x, + * callback.jobAttempts.get()); } } + */ + + @Test + public void testBackOffInvoked() throws Throwable { + for (int x = 1; x <= 10; x++) { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + MockBackOffStrategy backOff = new MockBackOffStrategy(); + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(10); + this.retryTemplate.setRetryPolicy(policy); + this.retryTemplate.setBackOffPolicy(backOff); + CompletableFuture result = this.retryTemplate.execute(callback); + assertEquals(callback.defaultResult, result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.jobAttempts.get()); + assertEquals(1, backOff.startCalls); + assertEquals(x - 1, backOff.backOffCalls); + } + } + + @Test + public void testNoSuccessRetry() throws Throwable { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + // Something that won't be thrown by JUnit... + callback.setExceptionToThrow(new IllegalArgumentException()); + callback.setAttemptsBeforeJobSuccess(Integer.MAX_VALUE); + int retryAttempts = 2; + this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryAttempts)); + try { + CompletableFuture result = this.retryTemplate.execute(callback); + result.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected IllegalArgumentException"); + } + catch (ExecutionException e) { + assertTrue("Expected IllegalArgumentException", e.getCause() instanceof IllegalArgumentException); + assertEquals(retryAttempts, callback.jobAttempts.get()); + return; + } + fail("Expected IllegalArgumentException"); + } + +} diff --git a/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java b/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java index 77f7e573..63eb46e7 100644 --- a/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java +++ b/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java @@ -140,7 +140,8 @@ public String recover(RetryContext context) { } }; Object result = null; - // On the second retry, the recovery path is taken... + // The recovery path is taken just after the first attempt. + // No rethrow, due to no rollback is required for this type of exception. result = this.retryTemplate.execute(callback, recoveryCallback, state); assertEquals(input, result); // default result is the item assertEquals(1, this.count);