Skip to content

Commit 3cb87af

Browse files
committed
Reduced resource consumption of async credential providers.
1. Share thread pools across async credential providers (anything using CachedSupplier's NonBlocking prefetch strategy). 2. Log a warning if an extreme number of concurrent refreshes are happening, to help users detect when they're not closing their credential providers. Even though this is an increase in resource sharing, it should not cause increased availability risks. Because these threads are only used for background refreshes, if one particular type of credential provider has availability problems (e.g. SSO or STS high latency), it only disables background refreshes, not prefetches or synchronous fetches.
1 parent d246a23 commit 3cb87af

File tree

7 files changed

+331
-55
lines changed

7 files changed

+331
-55
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Share background refresh threads across async credential providers to reduce base SDK resource consumption."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Log a warning when an extreme number of async credential providers are running in parallel, because it could indicate that the user is not closing their clients or credential providers when they are done using them."
6+
}

utils/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,27 @@
9494
<artifactId>reactive-streams-tck</artifactId>
9595
<scope>test</scope>
9696
</dependency>
97+
<dependency>
98+
<groupId>org.apache.logging.log4j</groupId>
99+
<artifactId>log4j-api</artifactId>
100+
<scope>test</scope>
101+
</dependency>
102+
<dependency>
103+
<groupId>org.apache.logging.log4j</groupId>
104+
<artifactId>log4j-core</artifactId>
105+
<scope>test</scope>
106+
</dependency>
107+
<dependency>
108+
<groupId>org.apache.logging.log4j</groupId>
109+
<artifactId>log4j-slf4j-impl</artifactId>
110+
<scope>test</scope>
111+
</dependency>
112+
<dependency>
113+
<groupId>org.slf4j</groupId>
114+
<artifactId>jcl-over-slf4j</artifactId>
115+
<scope>test</scope>
116+
<version>${slf4j.version}</version>
117+
</dependency>
97118
</dependencies>
98119

99120
<build>

utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* This should be created using {@link #builder(Supplier)}.
3838
*/
3939
@SdkProtectedApi
40-
public final class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
40+
public class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
4141
/**
4242
* Maximum time to wait for a blocking refresh lock before calling refresh again. This is to rate limit how many times we call
4343
* refresh. In the ideal case, refresh always occurs in a timely fashion and only one thread actually does the refresh.
@@ -107,7 +107,7 @@ private boolean cacheIsStale() {
107107
if (cachedValue.staleTime() == null) {
108108
return false;
109109
}
110-
return Instant.now().isAfter(cachedValue.staleTime());
110+
return !Instant.now().isBefore(cachedValue.staleTime());
111111
}
112112

113113
/**
@@ -118,7 +118,7 @@ private boolean shouldInitiateCachePrefetch() {
118118
if (cachedValue.prefetchTime() == null) {
119119
return false;
120120
}
121-
return Instant.now().isAfter(cachedValue.prefetchTime());
121+
return !Instant.now().isBefore(cachedValue.prefetchTime());
122122
}
123123

124124
/**
@@ -146,7 +146,7 @@ private void refreshCache() {
146146
}
147147

148148
// It wasn't, call the supplier to update it.
149-
cachedValue = valueSupplier.get();
149+
cachedValue = prefetchStrategy.fetch(valueSupplier);
150150
}
151151
} finally {
152152
if (lockAcquired) {
@@ -215,6 +215,10 @@ public interface PrefetchStrategy extends SdkAutoCloseable {
215215
*/
216216
void prefetch(Runnable valueUpdater);
217217

218+
default <T> RefreshResult<T> fetch(Supplier<RefreshResult<T>> supplier) {
219+
return supplier.get();
220+
}
221+
218222
/**
219223
* Invoked when the prefetch strategy is registered with a {@link CachedSupplier}.
220224
*/

utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java

Lines changed: 177 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,26 @@
1515

1616
package software.amazon.awssdk.utils.cache;
1717

18-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
18+
import static java.time.temporal.ChronoUnit.HOURS;
19+
import static java.time.temporal.ChronoUnit.MINUTES;
1920

2021
import java.time.Duration;
21-
import java.util.concurrent.ScheduledExecutorService;
22+
import java.time.Instant;
23+
import java.util.Optional;
24+
import java.util.Random;
25+
import java.util.concurrent.ScheduledFuture;
2226
import java.util.concurrent.ScheduledThreadPoolExecutor;
27+
import java.util.concurrent.SynchronousQueue;
28+
import java.util.concurrent.ThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2330
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.function.Supplier;
2434
import software.amazon.awssdk.annotations.SdkProtectedApi;
2535
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2636
import software.amazon.awssdk.utils.Logger;
2737
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
28-
import software.amazon.awssdk.utils.Validate;
2938

3039
/**
3140
* A {@link CachedSupplier.PrefetchStrategy} that will run a single thread in the background to update the value. A call to
@@ -37,80 +46,204 @@
3746
public class NonBlocking implements CachedSupplier.PrefetchStrategy {
3847
private static final Logger log = Logger.loggerFor(NonBlocking.class);
3948

49+
/**
50+
* The maximum number of concurrent refreshes before we start logging warnings and skipping refreshes.
51+
*/
52+
private static final int MAX_CONCURRENT_REFRESHES = 100;
53+
54+
/**
55+
* The {@link Random} instance used for calculating jitter of the background prefetches.
56+
*/
57+
private static final Random JITTER_RANDOM = new Random();
58+
59+
/**
60+
* Thread used to kick off refreshes during the prefetch window. This does not do the actual refreshing. That's left for
61+
* the {@link #EXECUTOR}.
62+
*/
63+
private static final ScheduledThreadPoolExecutor SCHEDULER =
64+
new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().daemonThreads(true).build());
65+
66+
/**
67+
* Threads used to do the actual work of refreshing the values (because the cached supplier might block, so we don't
68+
* want the work to be done by a small thread pool). This executor is created as unbounded, but we start complaining and
69+
* skipping refreshes when there are more than {@link #MAX_CONCURRENT_REFRESHES} running.
70+
*/
71+
private static final ThreadPoolExecutor EXECUTOR =
72+
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
73+
60L, TimeUnit.SECONDS,
74+
new SynchronousQueue<>(),
75+
new ThreadFactoryBuilder().daemonThreads(true).build());
76+
77+
/**
78+
* An incrementing number, used to uniquely identify an instance of NonBlocking in the {@link #asyncThreadName}.
79+
*/
80+
private static final AtomicLong THREAD_NUMBER = new AtomicLong(0);
81+
4082
/**
4183
* Whether we are currently refreshing the supplier. This is used to make sure only one caller is blocking at a time.
4284
*/
43-
private final AtomicBoolean currentlyRefreshing = new AtomicBoolean(false);
85+
private final AtomicBoolean currentlyPrefetching = new AtomicBoolean(false);
4486

4587
/**
46-
* How frequently to automatically refresh the supplier in the background.
88+
* Name of the thread refreshing the cache for this strategy.
4789
*/
48-
private final Duration asyncRefreshFrequency;
90+
private final String asyncThreadName;
4991

5092
/**
51-
* Single threaded executor to asynchronous refresh the value.
93+
* The refresh task currently scheduled for this non-blocking instance. We ensure that no more than one task is scheduled
94+
* per instance.
5295
*/
53-
private final ScheduledExecutorService executor;
96+
private final AtomicReference<ScheduledFuture<?>> refreshTask = new AtomicReference<>();
97+
98+
/**
99+
* The minimum amount of time allowed between async refreshes, primarily adjustable for testing purposes.
100+
*/
101+
private final Duration minimumRefreshFrequency;
102+
103+
/**
104+
* Whether this strategy has been shutdown (and should stop doing background refreshes)
105+
*/
106+
private volatile boolean shutdown = false;
107+
108+
/**
109+
* The cached supplier using this non-blocking instance.
110+
*/
111+
private volatile CachedSupplier<?> cachedSupplier;
112+
113+
static {
114+
// Ensure that cancelling a task actually removes it from the queue.
115+
SCHEDULER.setRemoveOnCancelPolicy(true);
116+
}
54117

55118
/**
56119
* Create a non-blocking prefetch strategy that uses the provided value for the name of the background thread that will be
57120
* performing the update.
58121
*/
59122
public NonBlocking(String asyncThreadName) {
60-
this(asyncThreadName, Duration.ofMinutes(1));
123+
this(asyncThreadName, Duration.ofSeconds(60));
61124
}
62125

63126
@SdkTestInternalApi
64-
NonBlocking(String asyncThreadName, Duration asyncRefreshFrequency) {
65-
this.executor = newExecutor(asyncThreadName);
66-
this.asyncRefreshFrequency = asyncRefreshFrequency;
127+
NonBlocking(String asyncThreadName, Duration minimumRefreshFrequency) {
128+
this.asyncThreadName = asyncThreadName + "-" + THREAD_NUMBER.getAndIncrement();
129+
this.minimumRefreshFrequency = minimumRefreshFrequency;
67130
}
68131

69-
private static ScheduledExecutorService newExecutor(String asyncThreadName) {
70-
Validate.paramNotBlank(asyncThreadName, "asyncThreadName");
71-
return new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().daemonThreads(true)
72-
.threadNamePrefix(asyncThreadName)
73-
.build());
132+
@SdkTestInternalApi
133+
static ThreadPoolExecutor executor() {
134+
return EXECUTOR;
74135
}
75136

76137
@Override
77138
public void initializeCachedSupplier(CachedSupplier<?> cachedSupplier) {
78-
scheduleRefresh(cachedSupplier);
139+
this.cachedSupplier = cachedSupplier;
79140
}
80141

81-
private void scheduleRefresh(CachedSupplier<?> cachedSupplier) {
82-
executor.schedule(() -> {
83-
try {
84-
cachedSupplier.get();
85-
} finally {
86-
scheduleRefresh(cachedSupplier);
87-
}
88-
}, asyncRefreshFrequency.toMillis(), MILLISECONDS);
142+
@Override
143+
public void prefetch(Runnable valueUpdater) {
144+
// Only run one async prefetch at a time.
145+
if (currentlyPrefetching.compareAndSet(false, true)) {
146+
tryRunBackgroundTask(valueUpdater, () -> currentlyPrefetching.set(false));
147+
}
89148
}
90149

91150
@Override
92-
public void prefetch(Runnable valueUpdater) {
93-
// Only run one async refresh at a time.
94-
if (currentlyRefreshing.compareAndSet(false, true)) {
95-
try {
96-
executor.submit(() -> {
97-
try {
98-
valueUpdater.run();
99-
} catch (RuntimeException e) {
100-
log.warn(() -> "Exception occurred in AWS SDK background task.", e);
101-
} finally {
102-
currentlyRefreshing.set(false);
103-
}
104-
});
105-
} catch (Throwable e) {
106-
currentlyRefreshing.set(false);
107-
throw e;
108-
}
151+
public <T> RefreshResult<T> fetch(Supplier<RefreshResult<T>> supplier) {
152+
RefreshResult<T> result = supplier.get();
153+
if (result.staleTime() == null || result.prefetchTime() == null) {
154+
return result;
155+
}
156+
157+
getRefreshTime(result).ifPresent(this::schedulePrefetch);
158+
return result;
159+
}
160+
161+
private Optional<Instant> getRefreshTime(RefreshResult<?> result) {
162+
Instant minStart = Instant.now().plus(minimumRefreshFrequency);
163+
Instant rangeStart = result.prefetchTime().isBefore(minStart) ? minStart : result.prefetchTime();
164+
165+
if (Duration.between(Instant.now(), rangeStart).toDays() > 7) {
166+
log.debug(() -> "Skipping background refresh because the prefetch time is too far in the future: " + rangeStart);
167+
return Optional.empty();
168+
}
169+
170+
Instant maxEnd = rangeStart.plus(1, HOURS);
171+
Instant rangeEnd = result.staleTime().isAfter(maxEnd) ? maxEnd : result.staleTime().minus(1, MINUTES);
172+
173+
if (rangeEnd.isBefore(rangeStart)) {
174+
return Optional.of(rangeStart);
175+
}
176+
177+
return Optional.of(randomTimeBetween(rangeStart, rangeEnd));
178+
}
179+
180+
private Instant randomTimeBetween(Instant rangeStart, Instant rangeEnd) {
181+
Duration timeBetween = Duration.between(rangeStart, rangeEnd);
182+
return rangeStart.plusMillis(Math.abs(JITTER_RANDOM.nextLong() % timeBetween.toMillis()));
183+
}
184+
185+
private void schedulePrefetch(Instant refreshTime) {
186+
if (shutdown) {
187+
return;
188+
}
189+
190+
Duration waitTime = Duration.between(Instant.now(), refreshTime);
191+
log.debug(() -> "Scheduling refresh attempt for " + refreshTime + " (in " + waitTime.toMillis() + " ms)");
192+
updateTask(SCHEDULER.schedule(() -> {
193+
Thread.currentThread().setName(asyncThreadName + "-scheduler");
194+
log.debug(() -> "Executing refresh attempt scheduled for " + refreshTime);
195+
196+
// If the supplier has already been prefetched, this will just be a cache hit.
197+
tryRunBackgroundTask(cachedSupplier::get);
198+
}, waitTime.toMillis(), TimeUnit.MILLISECONDS));
199+
200+
if (shutdown) {
201+
updateTask(null);
109202
}
110203
}
111204

112205
@Override
113206
public void close() {
114-
executor.shutdown();
207+
shutdown = true;
208+
updateTask(null);
209+
}
210+
211+
public void updateTask(ScheduledFuture<?> newTask) {
212+
ScheduledFuture<?> currentTask;
213+
do {
214+
currentTask = refreshTask.get();
215+
if (currentTask != null && !currentTask.isDone()) {
216+
currentTask.cancel(false);
217+
}
218+
} while (!refreshTask.compareAndSet(currentTask, newTask));
219+
}
220+
221+
public void tryRunBackgroundTask(Runnable runnable) {
222+
tryRunBackgroundTask(runnable, () -> {
223+
});
224+
}
225+
226+
public void tryRunBackgroundTask(Runnable runnable, Runnable finallyRunnable) {
227+
try {
228+
if (EXECUTOR.getActiveCount() > MAX_CONCURRENT_REFRESHES) {
229+
log.warn(() -> "Skipping a background refresh task because there are too many other tasks running.");
230+
return;
231+
}
232+
233+
EXECUTOR.submit(() -> {
234+
try {
235+
Thread.currentThread().setName(asyncThreadName);
236+
runnable.run();
237+
} catch (Throwable t) {
238+
log.warn(() -> "Exception occurred in AWS SDK background task.", t);
239+
} finally {
240+
finallyRunnable.run();
241+
}
242+
});
243+
} catch (Throwable t) {
244+
log.warn(() -> "Exception occurred when submitting AWS SDK background task.", t);
245+
} finally {
246+
finallyRunnable.run();
247+
}
115248
}
116249
}

0 commit comments

Comments
 (0)