Skip to content

Commit bf7ef79

Browse files
committed
Reduced resource consumption of async credential providers.
1. Share thread pools across async credential providers (anything using CachedSupplier's NonBlocking prefetch strategy). 2. Log a warning if an extreme number of concurrent refreshes are happening, to help users detect when they're not closing their credential providers. Even though this is an increase in resource sharing, it should not cause increased availability risks. Because these threads are only used for background refreshes, if one particular type of credential provider has availability problems (e.g. SSO or STS high latency), it only disables background refreshes, not prefetches or synchronous fetches.
1 parent 00afb9c commit bf7ef79

File tree

5 files changed

+208
-17
lines changed

5 files changed

+208
-17
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Share background refresh threads across async credential providers to reduce base SDK resource consumption."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Log a warning when an extreme number of async credential providers are running in parallel, because it could indicate that the user is not closing their clients or credential providers when they are done using them."
6+
}

utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* This should be created using {@link #builder(Supplier)}.
3838
*/
3939
@SdkProtectedApi
40-
public final class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
40+
public class CachedSupplier<T> implements Supplier<T>, SdkAutoCloseable {
4141
/**
4242
* Maximum time to wait for a blocking refresh lock before calling refresh again. This is to rate limit how many times we call
4343
* refresh. In the ideal case, refresh always occurs in a timely fashion and only one thread actually does the refresh.

utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java

Lines changed: 113 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1919

2020
import java.time.Duration;
21-
import java.util.concurrent.ScheduledExecutorService;
21+
import java.util.Random;
2222
import java.util.concurrent.ScheduledThreadPoolExecutor;
23+
import java.util.concurrent.Semaphore;
24+
import java.util.concurrent.SynchronousQueue;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
import java.util.concurrent.ThreadPoolExecutor;
2327
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicLong;
2429
import software.amazon.awssdk.annotations.SdkProtectedApi;
2530
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2631
import software.amazon.awssdk.utils.Logger;
2732
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
28-
import software.amazon.awssdk.utils.Validate;
2933

3034
/**
3135
* A {@link CachedSupplier.PrefetchStrategy} that will run a single thread in the background to update the value. A call to
@@ -37,40 +41,107 @@
3741
public class NonBlocking implements CachedSupplier.PrefetchStrategy {
3842
private static final Logger log = Logger.loggerFor(NonBlocking.class);
3943

44+
/**
45+
* The maximum number of concurrent refreshes allowed across all NonBlocking instances. This is to limit the amount of
46+
* traffic that can be generated by background refreshes. If this is exceeded, a background refresh gets skipped. This just
47+
* increases the chance of latency being pushed to the cached supplier caller, which is preferable to running out of memory.
48+
*/
49+
private static final int MAX_CONCURRENT_REFRESHES = 100;
50+
51+
/**
52+
* By default, how often we periodically call get() on the cached supplier. This may not necessarily call the downstream
53+
* service if the cache is not stale.
54+
*/
55+
private static final Duration DEFAULT_REFRESH_FREQUENCY = Duration.ofSeconds(60);
56+
57+
/**
58+
* By default, how much we jitter the {@link #DEFAULT_REFRESH_FREQUENCY}. This is done to prevent the case that a large
59+
* number of NonBlocking instances are created at once, so they all try to refresh at the same time.
60+
*/
61+
private static final Duration DEFAULT_REFRESH_FREQUENCY_JITTER = Duration.ofSeconds(10);
62+
63+
/**
64+
* The {@link Random} instance used for calculating jitter. See {@link #DEFAULT_REFRESH_FREQUENCY_JITTER}.
65+
*/
66+
private static final Random JITTER_RANDOM = new Random();
67+
68+
/**
69+
* Threads used to periodically kick off credential refreshes based on the {@link #asyncRefreshFrequency}.
70+
*/
71+
private static final ScheduledThreadPoolExecutor SCHEDULER =
72+
new ScheduledThreadPoolExecutor(3, new ThreadFactoryBuilder().threadNamePrefix("sdk-cached-supplier-scheduler")
73+
.daemonThreads(true)
74+
.build());
75+
76+
/**
77+
* Threads used to do the actual work of refreshing the credentials (because the cached supplier might block, so we don't
78+
* want the work to be done by a small thread pool). This executor is created as unbounded, but in reality it is limited by
79+
* the {@link #MAX_CONCURRENT_REFRESHES} via {@link #BACKGROUND_REFRESH_LEASE}.
80+
*/
81+
private static final ThreadPoolExecutor EXECUTOR =
82+
new ThreadPoolExecutor(0,
83+
Integer.MAX_VALUE,
84+
DEFAULT_REFRESH_FREQUENCY.toMillis() + DEFAULT_REFRESH_FREQUENCY_JITTER.toMillis() + 5_000,
85+
MILLISECONDS,
86+
new SynchronousQueue<>(),
87+
new ThreadFactoryBuilder().daemonThreads(true).build());
88+
/**
89+
* A set of leases used to prevent concurrent refreshes beyond the limit described in {@link #MAX_CONCURRENT_REFRESHES}. If
90+
* a lease cannot be acquired, the refresh is skipped.
91+
*/
92+
private static final Semaphore BACKGROUND_REFRESH_LEASE = new Semaphore(MAX_CONCURRENT_REFRESHES);
93+
94+
/**
95+
* An incrementing number, used to uniquely identify an instance of NonBlocking in the {@link #asyncThreadName}.
96+
*/
97+
private static final AtomicLong THREAD_NUMBER = new AtomicLong(0);
98+
4099
/**
41100
* Whether we are currently refreshing the supplier. This is used to make sure only one caller is blocking at a time.
42101
*/
43102
private final AtomicBoolean currentlyRefreshing = new AtomicBoolean(false);
44103

104+
/**
105+
* Name of the thread refreshing the cache for this strategy.
106+
*/
107+
private final String asyncThreadName;
108+
45109
/**
46110
* How frequently to automatically refresh the supplier in the background.
47111
*/
48112
private final Duration asyncRefreshFrequency;
49113

50114
/**
51-
* Single threaded executor to asynchronous refresh the value.
115+
* Whether this strategy has been shutdown (and should stop doing background refreshes)
52116
*/
53-
private final ScheduledExecutorService executor;
117+
private volatile boolean shutdown = false;
54118

55119
/**
56120
* Create a non-blocking prefetch strategy that uses the provided value for the name of the background thread that will be
57121
* performing the update.
58122
*/
59123
public NonBlocking(String asyncThreadName) {
60-
this(asyncThreadName, Duration.ofMinutes(1));
124+
this(asyncThreadName, defaultRefreshFrequency());
125+
}
126+
127+
private static Duration defaultRefreshFrequency() {
128+
// We jitter the default refresh frequency with each instance, so that objects created at the same time will not all be
129+
// refreshing at the exact same times.
130+
int jitter = Math.toIntExact(DEFAULT_REFRESH_FREQUENCY_JITTER.toMillis());
131+
long asyncRefreshFrequency = DEFAULT_REFRESH_FREQUENCY.toMillis() +
132+
JITTER_RANDOM.nextInt(jitter * 2 + 1) - jitter;
133+
return Duration.ofMillis(asyncRefreshFrequency);
61134
}
62135

63136
@SdkTestInternalApi
64137
NonBlocking(String asyncThreadName, Duration asyncRefreshFrequency) {
65-
this.executor = newExecutor(asyncThreadName);
138+
this.asyncThreadName = asyncThreadName + THREAD_NUMBER.getAndIncrement();
66139
this.asyncRefreshFrequency = asyncRefreshFrequency;
67140
}
68141

69-
private static ScheduledExecutorService newExecutor(String asyncThreadName) {
70-
Validate.paramNotBlank(asyncThreadName, "asyncThreadName");
71-
return new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().daemonThreads(true)
72-
.threadNamePrefix(asyncThreadName)
73-
.build());
142+
@SdkTestInternalApi
143+
static ThreadPoolExecutor executor() {
144+
return EXECUTOR;
74145
}
75146

76147
@Override
@@ -79,10 +150,35 @@ public void initializeCachedSupplier(CachedSupplier<?> cachedSupplier) {
79150
}
80151

81152
private void scheduleRefresh(CachedSupplier<?> cachedSupplier) {
82-
executor.schedule(() -> {
153+
SCHEDULER.schedule(() -> {
154+
Thread.currentThread().setName(asyncThreadName);
155+
156+
if (shutdown) {
157+
return;
158+
}
159+
160+
if (!BACKGROUND_REFRESH_LEASE.tryAcquire()) {
161+
log.warn(() -> "Skipped a background refresh to limit SDK resource consumption. Are you closing your SDK "
162+
+ "resources?");
163+
scheduleRefresh(cachedSupplier);
164+
return;
165+
}
166+
83167
try {
84-
cachedSupplier.get();
85-
} finally {
168+
EXECUTOR.execute(() -> {
169+
try {
170+
Thread.currentThread().setName(asyncThreadName);
171+
cachedSupplier.get();
172+
} catch (Exception e) {
173+
log.error(() -> "Background refresh failed: " + e.getMessage(), e);
174+
} finally {
175+
BACKGROUND_REFRESH_LEASE.release();
176+
scheduleRefresh(cachedSupplier);
177+
}
178+
});
179+
} catch (Throwable t) {
180+
BACKGROUND_REFRESH_LEASE.release();
181+
log.warn(() -> "Failed to submit a background refresh task.", t);
86182
scheduleRefresh(cachedSupplier);
87183
}
88184
}, asyncRefreshFrequency.toMillis(), MILLISECONDS);
@@ -93,7 +189,8 @@ public void prefetch(Runnable valueUpdater) {
93189
// Only run one async refresh at a time.
94190
if (currentlyRefreshing.compareAndSet(false, true)) {
95191
try {
96-
executor.submit(() -> {
192+
EXECUTOR.submit(() -> {
193+
Thread.currentThread().setName(asyncThreadName);
97194
try {
98195
valueUpdater.run();
99196
} catch (RuntimeException e) {
@@ -111,6 +208,6 @@ public void prefetch(Runnable valueUpdater) {
111208

112209
@Override
113210
public void close() {
114-
executor.shutdown();
211+
shutdown = true;
115212
}
116213
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.cache;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.time.Duration;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.junit.jupiter.api.MethodOrderer;
24+
import org.junit.jupiter.api.Order;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.TestMethodOrder;
27+
import org.mockito.Mockito;
28+
import org.mockito.stubbing.Answer;
29+
30+
// These tests assert on the largest pool size, so the test order matters (smaller expected sizes before larger expected sizes)
31+
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
32+
public class NonBlockingTest {
33+
@Test
34+
@Order(1)
35+
public void threadsAreSharedBetweenNonBlockingInstances() throws InterruptedException {
36+
List<NonBlocking> nbs = new ArrayList<>();
37+
try {
38+
// Create 99 concurrent non-blocking instances
39+
for (int i = 0; i < 99; i++) {
40+
NonBlocking nb = new NonBlocking("test", Duration.ofMillis(100));
41+
nb.initializeCachedSupplier(Mockito.mock(CachedSupplier.class));
42+
nbs.add(nb);
43+
Thread.sleep(10);
44+
}
45+
46+
Thread.sleep(1_000);
47+
48+
// Make sure we used less-than 99 to do the refreshes.
49+
assertThat(NonBlocking.executor().getLargestPoolSize()).isLessThan(99);
50+
} finally {
51+
nbs.forEach(NonBlocking::close);
52+
}
53+
}
54+
55+
@Test
56+
@Order(2)
57+
public void refreshesAreMaxed() throws InterruptedException {
58+
CachedSupplier<?> slowSupplier = Mockito.mock(CachedSupplier.class);
59+
Mockito.when(slowSupplier.get()).thenAnswer((Answer<Void>) invocation -> {
60+
Thread.sleep(1_000);
61+
return null;
62+
});
63+
64+
List<NonBlocking> nbs = new ArrayList<>();
65+
try {
66+
for (int i = 0; i < 1_000; i++) {
67+
NonBlocking nb = new NonBlocking("test", Duration.ofMillis(0));
68+
nb.initializeCachedSupplier(slowSupplier);
69+
nbs.add(nb);
70+
}
71+
72+
Thread.sleep(1_000);
73+
74+
// In a perfect world this would be capped to 100, but the mechanism we use to limit concurrent refreshes usually
75+
// means more than 100 get created. 150 should be a reasonable limit to check for, because without the limiter it
76+
// would be ~1000.
77+
assertThat(NonBlocking.executor().getLargestPoolSize()).isLessThanOrEqualTo(150);
78+
} finally {
79+
nbs.forEach(NonBlocking::close);
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)