Skip to content

Commit 49f66e0

Browse files
committed
Draft for async retry
1 parent 5852e1b commit 49f66e0

13 files changed

+1096
-214
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,48 @@ Here is an example of declarative iteration using Spring AOP to repeat a service
375375

376376
The example above uses a default `RetryTemplate` inside the interceptor. To change the policies or listeners, you only need to inject an instance of `RetryTemplate` into the interceptor.
377377

378+
## Asynchronous retry
379+
### Terms
380+
```java
381+
382+
CompletableFuture<HttpResponse<String>> completableFuture = retryTemplate.execute(
383+
ctx -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
384+
);
385+
```
386+
- __async callback__ - a callback that returns one of the supported async types (CompletableFuture, Future). Usually, async retry callback is one, that does not perform a heavy work by itself, but schedules the work to some worker and returns an instance of async type to track the progress. Failure of async callback itself usually means failure of scheduling (but not of actual work).
387+
Failure of async callback (of scheduling) and of actual job will both be retried on a common basis, according to configured policies.
388+
389+
- __job__ - a task with payload, usually heavy, which result will be available through the instance of async type, returned by async callback (and, consequently, by _execute_ method)
390+
391+
- __rescheduling executor__ - an instance of executor, used for scheduling a new retry attempt after a delay (provided by a backoff policy). The type of executor is restricted by ScheduledExecutorService, to take advantage of its "schedule after delay" feature, which allows us to implement backoff without blocking a thread. Rescheduling executor is used for all retries except of initial scheduling retries (initial invocation of async callback).
392+
393+
### Initial invocation of async callback
394+
Invocation of template.execute(asyncCallback) returns when first scheduling of job succeeded, or all initial scheduling attempts failed. Retry template does not produce async containers by itself, therefore there is nothing to return from _execute_ until initial invocation succeed. Backoffs between failing initial scheduling attempts will be performed by default sleeper by means of Thread.sleep() on caller thread. Why this approach is used:
395+
- to be compatible with generic API of RetryOperations (where return type of callback equals to retrun type of execute(...))
396+
- to provide an additional mean of back pressure
397+
398+
### Subsequent invocations of async callback
399+
If the first execution of the _job_ failed and a retry is allowed by the policy, the next invocation of the async callback will be scheduled on _rescheduling executor_
400+
401+
### Async callbacks without executor
402+
If executor is not provided, a backoff will be performed by Thread.sleep() on the client thread (for initial scheduling) or on the worker thread (for job failures, or for subsequent schedulings).
403+
404+
### Configuration example
405+
```java
406+
RetryTemplate.builder()
407+
// activte the async retry feature with an executor
408+
.asyncRetry(Executors.newScheduledThreadPool(1))
409+
.fixedBackoff(1000)
410+
.build();
411+
412+
RetryTemplate.builder()
413+
// activte the async retry feature without an executor.
414+
// Thread.sleep() will be used for backoff.
415+
.asyncRetry()
416+
.fixedBackoff(1000)
417+
.build();
418+
```
419+
378420
## Contributing
379421

380422
Spring Retry is released under the non-restrictive Apache 2.0 license,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.springframework.retry.backoff;
2+
3+
import java.util.function.Supplier;
4+
5+
public interface LastBackoffPeriodSupplier extends Supplier<Long> {
6+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.springframework.retry.backoff;
2+
3+
import org.apache.commons.logging.Log;
4+
import org.apache.commons.logging.LogFactory;
5+
6+
public class RememberPeriodSleeper implements Sleeper, LastBackoffPeriodSupplier {
7+
8+
private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class);
9+
10+
private volatile Long lastBackoffPeriod;
11+
12+
@Override
13+
public void sleep(long backOffPeriod) {
14+
logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod);
15+
lastBackoffPeriod = backOffPeriod;
16+
}
17+
18+
@Override
19+
public Long get() {
20+
return lastBackoffPeriod;
21+
}
22+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.retry.support;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CompletionException;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.function.Consumer;
26+
import java.util.function.Function;
27+
import java.util.function.Supplier;
28+
29+
import org.apache.commons.logging.Log;
30+
import org.apache.commons.logging.LogFactory;
31+
import org.springframework.retry.RetryContext;
32+
import org.springframework.retry.RetryException;
33+
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;
34+
35+
/**
36+
* @author Dave Syer
37+
*/
38+
public abstract class AsyncRetryResultProcessor<T> implements RetryResultProcessor<T> {
39+
private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class);
40+
41+
protected T doNewAttempt(Supplier<Result<T>> supplier) throws Throwable {
42+
logger.debug("Performing the next async callback invocation...");
43+
return supplier.get().getOrThrow();
44+
}
45+
46+
protected abstract T scheduleNewAttemptAfterDelay(
47+
Supplier<Result<T>> supplier,
48+
ScheduledExecutorService reschedulingExecutor,
49+
long rescheduleAfterMillis,
50+
RetryContext ctx
51+
) throws Throwable;
52+
53+
protected T handleException(Supplier<Result<T>> supplier,
54+
Consumer<Throwable> handler,
55+
Throwable throwable,
56+
ScheduledExecutorService reschedulingExecutor,
57+
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
58+
RetryContext ctx) {
59+
try {
60+
handler.accept(unwrapIfNeed(throwable));
61+
62+
if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) {
63+
return doNewAttempt(supplier);
64+
} else {
65+
long rescheduleAfterMillis = lastBackoffPeriodSupplier.get();
66+
logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis...");
67+
return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx);
68+
}
69+
}
70+
catch (Throwable t) {
71+
throw RetryTemplate.runtimeException(unwrapIfNeed(t));
72+
}
73+
}
74+
75+
static Throwable unwrapIfNeed(Throwable throwable) {
76+
if (throwable instanceof ExecutionException
77+
|| throwable instanceof CompletionException
78+
|| throwable instanceof RetryException) {
79+
return throwable.getCause();
80+
} else {
81+
return throwable;
82+
}
83+
}
84+
}

src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java

Lines changed: 41 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.CompletionException;
2121
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.TimeUnit;
2224
import java.util.function.Consumer;
2325
import java.util.function.Function;
2426
import java.util.function.Supplier;
2527

28+
import org.apache.commons.logging.Log;
29+
import org.apache.commons.logging.LogFactory;
2630
import org.springframework.retry.RetryCallback;
31+
import org.springframework.retry.RetryContext;
2732
import org.springframework.retry.RetryException;
33+
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;
2834

2935
/**
3036
* A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a
@@ -33,58 +39,47 @@
3339
*
3440
* @author Dave Syer
3541
*/
36-
public class CompletableFutureRetryResultProcessor
37-
implements RetryResultProcessor<CompletableFuture<?>> {
42+
public class CompletableFutureRetryResultProcessor<V>
43+
extends AsyncRetryResultProcessor<CompletableFuture<V>> {
44+
45+
protected final Log logger = LogFactory.getLog(getClass());
3846

3947
@Override
40-
public Result<CompletableFuture<?>> process(CompletableFuture<?> completable,
41-
Supplier<Result<CompletableFuture<?>>> supplier,
42-
Consumer<Throwable> handler) {
43-
@SuppressWarnings("unchecked")
44-
CompletableFuture<Object> typed = (CompletableFuture<Object>) completable;
45-
CompletableFuture<?> handle = typed
46-
.thenApply(value -> CompletableFuture.completedFuture(value))
47-
.exceptionally(throwable -> apply(supplier, handler, throwable))
48+
public Result<CompletableFuture<V>> process(CompletableFuture<V> completable,
49+
Supplier<Result<CompletableFuture<V>>> supplier,
50+
Consumer<Throwable> handler, ScheduledExecutorService reschedulingExecutor,
51+
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
52+
RetryContext ctx) {
53+
54+
CompletableFuture<V> handle = completable
55+
.thenApply(CompletableFuture::completedFuture)
56+
.exceptionally(throwable -> handleException(
57+
supplier, handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
58+
)
4859
.thenCompose(Function.identity());
60+
4961
return new Result<>(handle);
5062
}
5163

52-
private CompletableFuture<Object> apply(
53-
Supplier<Result<CompletableFuture<?>>> supplier, Consumer<Throwable> handler,
54-
Throwable throwable) {
55-
Throwable error = throwable;
56-
try {
57-
if (throwable instanceof ExecutionException
58-
|| throwable instanceof CompletionException) {
59-
error = throwable.getCause();
60-
}
61-
handler.accept(error);
62-
Result<CompletableFuture<?>> result = supplier.get();
63-
if (result.isComplete()) {
64-
@SuppressWarnings("unchecked")
65-
CompletableFuture<Object> output = (CompletableFuture<Object>) result
66-
.getResult();
67-
return output;
64+
protected CompletableFuture<V> scheduleNewAttemptAfterDelay(
65+
Supplier<Result<CompletableFuture<V>>> supplier,
66+
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis,
67+
RetryContext ctx)
68+
{
69+
CompletableFuture<CompletableFuture<V>> futureOfFurtherScheduling = new CompletableFuture<>();
70+
71+
reschedulingExecutor.schedule(() -> {
72+
try {
73+
RetrySynchronizationManager.register(ctx);
74+
futureOfFurtherScheduling.complete(doNewAttempt(supplier));
75+
} catch (Throwable t) {
76+
futureOfFurtherScheduling.completeExceptionally(t);
77+
throw RetryTemplate.runtimeException(t);
78+
} finally {
79+
RetrySynchronizationManager.clear();
6880
}
69-
throw result.exception;
70-
}
71-
catch (InterruptedException e) {
72-
Thread.currentThread().interrupt();
73-
error = e;
74-
}
75-
catch (CompletionException e) {
76-
error = e.getCause();
77-
}
78-
catch (ExecutionException e) {
79-
error = e.getCause();
80-
}
81-
catch (RetryException e) {
82-
error = e.getCause();
83-
}
84-
catch (Throwable e) {
85-
error = e;
86-
}
87-
throw RetryTemplate.runtimeException(error);
88-
}
81+
}, rescheduleAfterMillis, TimeUnit.MILLISECONDS);
8982

83+
return futureOfFurtherScheduling.thenCompose(Function.identity());
84+
}
9085
}

0 commit comments

Comments
 (0)