18
18
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
19
19
20
20
import java .time .Duration ;
21
- import java .util .concurrent . ScheduledExecutorService ;
21
+ import java .util .Random ;
22
22
import java .util .concurrent .ScheduledThreadPoolExecutor ;
23
+ import java .util .concurrent .Semaphore ;
24
+ import java .util .concurrent .SynchronousQueue ;
25
+ import java .util .concurrent .ThreadPoolExecutor ;
23
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import java .util .concurrent .atomic .AtomicLong ;
24
28
import software .amazon .awssdk .annotations .SdkProtectedApi ;
25
29
import software .amazon .awssdk .annotations .SdkTestInternalApi ;
26
30
import software .amazon .awssdk .utils .Logger ;
27
31
import software .amazon .awssdk .utils .ThreadFactoryBuilder ;
28
- import software .amazon .awssdk .utils .Validate ;
29
32
30
33
/**
31
34
* A {@link CachedSupplier.PrefetchStrategy} that will run a single thread in the background to update the value. A call to
37
40
public class NonBlocking implements CachedSupplier .PrefetchStrategy {
38
41
private static final Logger log = Logger .loggerFor (NonBlocking .class );
39
42
43
+ /**
44
+ * The maximum number of concurrent refreshes allowed across all NonBlocking instances. This is to limit the amount of
45
+ * traffic that can be generated by background refreshes. If this is exceeded, a background refresh gets skipped. This just
46
+ * increases the chance of latency being pushed to the cached supplier caller, which is preferable to running out of memory.
47
+ */
48
+ private static final int MAX_CONCURRENT_REFRESHES = 100 ;
49
+
50
+ /**
51
+ * By default, how often we periodically call get() on the cached supplier. This may not necessarily call the downstream
52
+ * service if the cache is not stale.
53
+ */
54
+ private static final Duration DEFAULT_REFRESH_FREQUENCY = Duration .ofSeconds (60 );
55
+
56
+ /**
57
+ * By default, how much we jitter the {@link #DEFAULT_REFRESH_FREQUENCY}. This is done to prevent the case that a large
58
+ * number of NonBlocking instances are created at once, so they all try to refresh at the same time.
59
+ */
60
+ private static final Duration DEFAULT_REFRESH_FREQUENCY_JITTER = Duration .ofSeconds (10 );
61
+
62
+ /**
63
+ * The {@link Random} instance used for calculating jitter. See {@link #DEFAULT_REFRESH_FREQUENCY_JITTER}.
64
+ */
65
+ private static final Random JITTER_RANDOM = new Random ();
66
+
67
+ /**
68
+ * Threads used to periodically kick off credential refreshes based on the {@link #asyncRefreshFrequency}.
69
+ */
70
+ private static final ScheduledThreadPoolExecutor SCHEDULER =
71
+ new ScheduledThreadPoolExecutor (3 , new ThreadFactoryBuilder ().threadNamePrefix ("sdk-cached-supplier-scheduler" )
72
+ .daemonThreads (true )
73
+ .build ());
74
+
75
+ /**
76
+ * Threads used to do the actual work of refreshing the credentials (because the cached supplier might block, so we don't
77
+ * want the work to be done by a small thread pool). This executor is created as unbounded, but in reality it is limited by
78
+ * the {@link #MAX_CONCURRENT_REFRESHES} via {@link #BACKGROUND_REFRESH_LEASE}.
79
+ */
80
+ private static final ThreadPoolExecutor EXECUTOR =
81
+ new ThreadPoolExecutor (0 ,
82
+ Integer .MAX_VALUE ,
83
+ DEFAULT_REFRESH_FREQUENCY .toMillis () + DEFAULT_REFRESH_FREQUENCY_JITTER .toMillis () + 5_000 ,
84
+ MILLISECONDS ,
85
+ new SynchronousQueue <>(),
86
+ new ThreadFactoryBuilder ().daemonThreads (true ).build ());
87
+ /**
88
+ * A set of leases used to prevent concurrent refreshes beyond the limit described in {@link #MAX_CONCURRENT_REFRESHES}. If
89
+ * a lease cannot be acquired, the refresh is skipped.
90
+ */
91
+ private static final Semaphore BACKGROUND_REFRESH_LEASE = new Semaphore (MAX_CONCURRENT_REFRESHES );
92
+
93
+ /**
94
+ * An incrementing number, used to uniquely identify an instance of NonBlocking in the {@link #asyncThreadName}.
95
+ */
96
+ private static final AtomicLong THREAD_NUMBER = new AtomicLong (0 );
97
+
40
98
/**
41
99
* Whether we are currently refreshing the supplier. This is used to make sure only one caller is blocking at a time.
42
100
*/
43
101
private final AtomicBoolean currentlyRefreshing = new AtomicBoolean (false );
44
102
103
+ /**
104
+ * Name of the thread refreshing the cache for this strategy.
105
+ */
106
+ private final String asyncThreadName ;
107
+
45
108
/**
46
109
* How frequently to automatically refresh the supplier in the background.
47
110
*/
48
111
private final Duration asyncRefreshFrequency ;
49
112
50
113
/**
51
- * Single threaded executor to asynchronous refresh the value.
114
+ * Whether this strategy has been shutdown (and should stop doing background refreshes)
52
115
*/
53
- private final ScheduledExecutorService executor ;
116
+ private volatile boolean shutdown = false ;
54
117
55
118
/**
56
119
* Create a non-blocking prefetch strategy that uses the provided value for the name of the background thread that will be
57
120
* performing the update.
58
121
*/
59
122
public NonBlocking (String asyncThreadName ) {
60
- this (asyncThreadName , Duration . ofMinutes ( 1 ));
123
+ this (asyncThreadName , defaultRefreshFrequency ( ));
61
124
}
62
125
63
126
@ SdkTestInternalApi
64
127
NonBlocking (String asyncThreadName , Duration asyncRefreshFrequency ) {
65
- this .executor = newExecutor ( asyncThreadName );
128
+ this .asyncThreadName = asyncThreadName + THREAD_NUMBER . getAndIncrement ( );
66
129
this .asyncRefreshFrequency = asyncRefreshFrequency ;
67
130
}
68
131
69
- private static ScheduledExecutorService newExecutor (String asyncThreadName ) {
70
- Validate .paramNotBlank (asyncThreadName , "asyncThreadName" );
71
- return new ScheduledThreadPoolExecutor (1 , new ThreadFactoryBuilder ().daemonThreads (true )
72
- .threadNamePrefix (asyncThreadName )
73
- .build ());
132
+ private static Duration defaultRefreshFrequency () {
133
+ // We jitter the default refresh frequency with each instance, so that objects created at the same time will not all be
134
+ // refreshing at the exact same times.
135
+ int jitter = Math .toIntExact (DEFAULT_REFRESH_FREQUENCY_JITTER .toMillis ());
136
+ long asyncRefreshFrequency = DEFAULT_REFRESH_FREQUENCY .toMillis () +
137
+ JITTER_RANDOM .nextInt (jitter * 2 + 1 ) - jitter ;
138
+ return Duration .ofMillis (asyncRefreshFrequency );
139
+ }
140
+
141
+ @ SdkTestInternalApi
142
+ static ThreadPoolExecutor executor () {
143
+ return EXECUTOR ;
74
144
}
75
145
76
146
@ Override
@@ -79,10 +149,35 @@ public void initializeCachedSupplier(CachedSupplier<?> cachedSupplier) {
79
149
}
80
150
81
151
private void scheduleRefresh (CachedSupplier <?> cachedSupplier ) {
82
- executor .schedule (() -> {
152
+ SCHEDULER .schedule (() -> {
153
+ Thread .currentThread ().setName (asyncThreadName );
154
+
155
+ if (shutdown ) {
156
+ return ;
157
+ }
158
+
159
+ if (!BACKGROUND_REFRESH_LEASE .tryAcquire ()) {
160
+ log .warn (() -> "Skipped a background refresh to limit SDK resource consumption. Are you closing your SDK "
161
+ + "resources?" );
162
+ scheduleRefresh (cachedSupplier );
163
+ return ;
164
+ }
165
+
83
166
try {
84
- cachedSupplier .get ();
85
- } finally {
167
+ EXECUTOR .execute (() -> {
168
+ Thread .currentThread ().setName (asyncThreadName );
169
+ try {
170
+ cachedSupplier .get ();
171
+ } catch (Exception e ) {
172
+ log .warn (() -> "Exception occurred in AWS SDK background task." , e );
173
+ } finally {
174
+ BACKGROUND_REFRESH_LEASE .release ();
175
+ scheduleRefresh (cachedSupplier );
176
+ }
177
+ });
178
+ } catch (Throwable t ) {
179
+ BACKGROUND_REFRESH_LEASE .release ();
180
+ log .warn (() -> "Failed to submit a background refresh task." , t );
86
181
scheduleRefresh (cachedSupplier );
87
182
}
88
183
}, asyncRefreshFrequency .toMillis (), MILLISECONDS );
@@ -93,7 +188,8 @@ public void prefetch(Runnable valueUpdater) {
93
188
// Only run one async refresh at a time.
94
189
if (currentlyRefreshing .compareAndSet (false , true )) {
95
190
try {
96
- executor .submit (() -> {
191
+ EXECUTOR .submit (() -> {
192
+ Thread .currentThread ().setName (asyncThreadName );
97
193
try {
98
194
valueUpdater .run ();
99
195
} catch (RuntimeException e ) {
@@ -111,6 +207,6 @@ public void prefetch(Runnable valueUpdater) {
111
207
112
208
@ Override
113
209
public void close () {
114
- executor . shutdown () ;
210
+ shutdown = true ;
115
211
}
116
212
}
0 commit comments