18
18
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
19
19
20
20
import java .time .Duration ;
21
- import java .util .concurrent .ScheduledExecutorService ;
22
21
import java .util .concurrent .ScheduledThreadPoolExecutor ;
22
+ import java .util .concurrent .Semaphore ;
23
+ import java .util .concurrent .SynchronousQueue ;
24
+ import java .util .concurrent .ThreadLocalRandom ;
25
+ import java .util .concurrent .ThreadPoolExecutor ;
23
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import java .util .concurrent .atomic .AtomicLong ;
24
28
import software .amazon .awssdk .annotations .SdkProtectedApi ;
25
29
import software .amazon .awssdk .annotations .SdkTestInternalApi ;
26
30
import software .amazon .awssdk .utils .Logger ;
27
31
import software .amazon .awssdk .utils .ThreadFactoryBuilder ;
28
- import software .amazon .awssdk .utils .Validate ;
29
32
30
33
/**
31
34
* A {@link CachedSupplier.PrefetchStrategy} that will run a single thread in the background to update the value. A call to
37
40
public class NonBlocking implements CachedSupplier .PrefetchStrategy {
38
41
private static final Logger log = Logger .loggerFor (NonBlocking .class );
39
42
43
+ /**
44
+ * The maximum number of concurrent refreshes allowed across all NonBlocking instances. This is to limit the amount of
45
+ * traffic that can be generated by background refreshes. If this is exceeded, a background refresh gets skipped. This just
46
+ * increases the chance of latency being pushed to the cached supplier caller, which is preferable to running out of memory.
47
+ */
48
+ private static final int MAX_CONCURRENT_REFRESHES = 100 ;
49
+
50
+ /**
51
+ * By default, how often we periodically call get() on the cached supplier. This may not necessarily call the downstream
52
+ * service if the cache is not stale.
53
+ */
54
+ private static final Duration DEFAULT_REFRESH_FREQUENCY = Duration .ofSeconds (60 );
55
+
56
+ /**
57
+ * By default, how much we jitter the {@link #DEFAULT_REFRESH_FREQUENCY}. This is done to prevent the case that a large
58
+ * number of NonBlocking instances are created at once, so they all try to refresh at the same time.
59
+ */
60
+ private static final Duration DEFAULT_REFRESH_FREQUENCY_JITTER = Duration .ofSeconds (10 );
61
+
62
+ /**
63
+ * Threads used to periodically kick off credential refreshes based on the {@link #asyncRefreshFrequency}.
64
+ */
65
+ private static final ScheduledThreadPoolExecutor SCHEDULER =
66
+ new ScheduledThreadPoolExecutor (3 , new ThreadFactoryBuilder ().threadNamePrefix ("sdk-cached-supplier-scheduler" )
67
+ .daemonThreads (true )
68
+ .build ());
69
+
70
+ /**
71
+ * Threads used to do the actual work of refreshing the credentials (because the cached supplier might block, so we don't
72
+ * want the work to be done by a small thread pool). This executor is created as unbounded, but in reality it is limited by
73
+ * the {@link #MAX_CONCURRENT_REFRESHES} via {@link #BACKGROUND_REFRESH_LEASE}.
74
+ */
75
+ private static final ThreadPoolExecutor EXECUTOR =
76
+ new ThreadPoolExecutor (0 ,
77
+ Integer .MAX_VALUE ,
78
+ DEFAULT_REFRESH_FREQUENCY .toMillis () + DEFAULT_REFRESH_FREQUENCY_JITTER .toMillis () + 5_000 ,
79
+ MILLISECONDS ,
80
+ new SynchronousQueue <>(),
81
+ new ThreadFactoryBuilder ().daemonThreads (true ).build ());
82
+ /**
83
+ * A set of leases used to prevent concurrent refreshes beyond the limit described in {@link #MAX_CONCURRENT_REFRESHES}. If
84
+ * a lease cannot be acquired, the refresh is skipped.
85
+ */
86
+ private static final Semaphore BACKGROUND_REFRESH_LEASE = new Semaphore (MAX_CONCURRENT_REFRESHES );
87
+
88
+ /**
89
+ * An incrementing number, used to uniquely identify an instance of NonBlocking in the {@link #asyncThreadName}.
90
+ */
91
+ private static final AtomicLong THREAD_NUMBER = new AtomicLong (0 );
92
+
40
93
/**
41
94
* Whether we are currently refreshing the supplier. This is used to make sure only one caller is blocking at a time.
42
95
*/
43
96
private final AtomicBoolean currentlyRefreshing = new AtomicBoolean (false );
44
97
98
+ /**
99
+ * Name of the thread refreshing the cache for this strategy.
100
+ */
101
+ private final String asyncThreadName ;
102
+
45
103
/**
46
104
* How frequently to automatically refresh the supplier in the background.
47
105
*/
48
106
private final Duration asyncRefreshFrequency ;
49
107
50
108
/**
51
- * Single threaded executor to asynchronous refresh the value.
109
+ * Whether this strategy has been shutdown (and should stop doing background refreshes)
52
110
*/
53
- private final ScheduledExecutorService executor ;
111
+ private volatile boolean shutdown = false ;
54
112
55
113
/**
56
114
* Create a non-blocking prefetch strategy that uses the provided value for the name of the background thread that will be
57
115
* performing the update.
58
116
*/
59
117
public NonBlocking (String asyncThreadName ) {
60
- this (asyncThreadName , Duration .ofMinutes (1 ));
118
+ this (asyncThreadName , defaultRefreshFrequency ());
119
+ }
120
+
121
+ private static Duration defaultRefreshFrequency () {
122
+ // We jitter the default refresh frequency with each instance, so that objects created at the same time will not all be
123
+ // refreshing at the exact same times.
124
+ long jitter = DEFAULT_REFRESH_FREQUENCY_JITTER .toMillis ();
125
+ long asyncRefreshFrequency = DEFAULT_REFRESH_FREQUENCY .toMillis () +
126
+ ThreadLocalRandom .current ().nextLong (-jitter , jitter + 1 );
127
+ return Duration .ofMillis (asyncRefreshFrequency );
61
128
}
62
129
63
130
@ SdkTestInternalApi
64
131
NonBlocking (String asyncThreadName , Duration asyncRefreshFrequency ) {
65
- this .executor = newExecutor ( asyncThreadName );
132
+ this .asyncThreadName = asyncThreadName + THREAD_NUMBER . getAndIncrement ( );
66
133
this .asyncRefreshFrequency = asyncRefreshFrequency ;
67
134
}
68
135
69
- private static ScheduledExecutorService newExecutor (String asyncThreadName ) {
70
- Validate .paramNotBlank (asyncThreadName , "asyncThreadName" );
71
- return new ScheduledThreadPoolExecutor (1 , new ThreadFactoryBuilder ().daemonThreads (true )
72
- .threadNamePrefix (asyncThreadName )
73
- .build ());
136
+ @ SdkTestInternalApi
137
+ static ThreadPoolExecutor executor () {
138
+ return EXECUTOR ;
74
139
}
75
140
76
141
@ Override
@@ -79,10 +144,35 @@ public void initializeCachedSupplier(CachedSupplier<?> cachedSupplier) {
79
144
}
80
145
81
146
private void scheduleRefresh (CachedSupplier <?> cachedSupplier ) {
82
- executor .schedule (() -> {
147
+ SCHEDULER .schedule (() -> {
148
+ Thread .currentThread ().setName (asyncThreadName );
149
+
150
+ if (shutdown ) {
151
+ return ;
152
+ }
153
+
154
+ if (!BACKGROUND_REFRESH_LEASE .tryAcquire ()) {
155
+ log .warn (() -> "Skipped a background refresh to limit SDK resource consumption. Are you closing your SDK "
156
+ + "resources?" );
157
+ scheduleRefresh (cachedSupplier );
158
+ return ;
159
+ }
160
+
83
161
try {
84
- cachedSupplier .get ();
85
- } finally {
162
+ EXECUTOR .execute (() -> {
163
+ try {
164
+ Thread .currentThread ().setName (asyncThreadName );
165
+ cachedSupplier .get ();
166
+ } catch (Exception e ) {
167
+ log .error (() -> "Background refresh failed: " + e .getMessage (), e );
168
+ } finally {
169
+ BACKGROUND_REFRESH_LEASE .release ();
170
+ scheduleRefresh (cachedSupplier );
171
+ }
172
+ });
173
+ } catch (Throwable t ) {
174
+ BACKGROUND_REFRESH_LEASE .release ();
175
+ log .warn (() -> "Failed to submit a background refresh task." , t );
86
176
scheduleRefresh (cachedSupplier );
87
177
}
88
178
}, asyncRefreshFrequency .toMillis (), MILLISECONDS );
@@ -93,7 +183,7 @@ public void prefetch(Runnable valueUpdater) {
93
183
// Only run one async refresh at a time.
94
184
if (currentlyRefreshing .compareAndSet (false , true )) {
95
185
try {
96
- executor .submit (() -> {
186
+ EXECUTOR .submit (() -> {
97
187
try {
98
188
valueUpdater .run ();
99
189
} catch (RuntimeException e ) {
@@ -111,6 +201,6 @@ public void prefetch(Runnable valueUpdater) {
111
201
112
202
@ Override
113
203
public void close () {
114
- executor . shutdown () ;
204
+ shutdown = true ;
115
205
}
116
206
}
0 commit comments