diff --git a/.changes/next-release/feature-AWSSDKforJavav2-36b8c47.json b/.changes/next-release/feature-AWSSDKforJavav2-36b8c47.json
new file mode 100644
index 000000000000..0e596ac23b9f
--- /dev/null
+++ b/.changes/next-release/feature-AWSSDKforJavav2-36b8c47.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "AWS SDK for Java v2",
+ "contributor": "",
+ "description": "Share background refresh threads across async credential providers to reduce base SDK resource consumption."
+}
diff --git a/.changes/next-release/feature-AWSSDKforJavav2-8f6d13b.json b/.changes/next-release/feature-AWSSDKforJavav2-8f6d13b.json
new file mode 100644
index 000000000000..9ebc013a312a
--- /dev/null
+++ b/.changes/next-release/feature-AWSSDKforJavav2-8f6d13b.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "AWS SDK for Java v2",
+ "contributor": "",
+ "description": "Log a warning when an extreme number of async credential providers are running in parallel, because it could indicate that the user is not closing their clients or credential providers when they are done using them."
+}
diff --git a/.changes/next-release/feature-AWSSDKforJavav2-d28e3ad.json b/.changes/next-release/feature-AWSSDKforJavav2-d28e3ad.json
new file mode 100644
index 000000000000..fe12d5e49282
--- /dev/null
+++ b/.changes/next-release/feature-AWSSDKforJavav2-d28e3ad.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "AWS SDK for Java v2",
+ "contributor": "",
+ "description": "Jitter credential provider cache refresh times."
+}
diff --git a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/HttpCredentialsProvider.java b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/HttpCredentialsProvider.java
index b6d153a38d40..ccc7e7aa7101 100644
--- a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/HttpCredentialsProvider.java
+++ b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/HttpCredentialsProvider.java
@@ -28,12 +28,11 @@
public interface HttpCredentialsProvider extends AwsCredentialsProvider, SdkAutoCloseable {
interface Builder> {
/**
- * Configure whether this provider should fetch credentials asynchronously in the background. If this is true, threads are
- * less likely to block when {@link #resolveCredentials()} is called, but additional resources are used to maintain the
- * provider.
+ * Configure whether the provider should fetch credentials asynchronously in the background. If this is true,
+ * threads are less likely to block when credentials are loaded, but additional resources are used to maintain
+ * the provider.
*
- *
- * By default, this is disabled.
+ *
By default, this is disabled.
*/
BuilderT asyncCredentialUpdateEnabled(Boolean asyncCredentialUpdateEnabled);
diff --git a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProvider.java b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProvider.java
index 4f5caa90e890..ae1a13abeb16 100644
--- a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProvider.java
+++ b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProvider.java
@@ -16,12 +16,12 @@
package software.amazon.awssdk.auth.credentials;
import static java.time.temporal.ChronoUnit.MINUTES;
-import static java.time.temporal.ChronoUnit.SECONDS;
-import static software.amazon.awssdk.utils.ComparableUtils.minimum;
+import static software.amazon.awssdk.utils.ComparableUtils.maximum;
import java.io.IOException;
import java.net.URI;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
@@ -151,7 +151,7 @@ private RefreshResult refreshCredentials() {
// Choose whether to report this failure at the debug or warn level based on how much time is left on the
// credentials before expiration.
Supplier errorMessage = () -> "Failure encountered when attempting to refresh credentials from IMDS.";
- Instant fifteenMinutesFromNow = Instant.now().plus(15, MINUTES);
+ Instant fifteenMinutesFromNow = clock.instant().plus(15, MINUTES);
if (expiration.isBefore(fifteenMinutesFromNow)) {
log.warn(errorMessage, e);
} else {
@@ -164,7 +164,7 @@ private RefreshResult refreshCredentials() {
}
return RefreshResult.builder(credentials.getAwsCredentials())
- .staleTime(null) // Allow use of expired credentials - they may still work
+ .staleTime(Instant.MAX) // Allow use of expired credentials - they may still work
.prefetchTime(prefetchTime(credentials.getExpiration().orElse(null)))
.build();
}
@@ -180,35 +180,18 @@ private boolean isLocalCredentialLoadingDisabled() {
private Instant prefetchTime(Instant expiration) {
Instant now = clock.instant();
- // If expiration time doesn't exist, refresh in 60 minutes
if (expiration == null) {
return now.plus(60, MINUTES);
}
- // If expiration time is 60+ minutes from now, refresh in 30 minutes.
- Instant sixtyMinutesBeforeExpiration = expiration.minus(60, MINUTES);
- if (now.isBefore(sixtyMinutesBeforeExpiration)) {
- return now.plus(30, MINUTES);
+ Duration timeUntilExpiration = Duration.between(now, expiration);
+ if (timeUntilExpiration.isNegative()) {
+ log.warn(() -> "IMDS credential expiration has been extended due to an IMDS availability outage. A refresh "
+ + "of these credentials will be attempted again in ~5 minutes.");
+ return now.plus(5, MINUTES);
}
- // If expiration time is 15 minutes or more from now, refresh in 10 minutes.
- Instant fifteenMinutesBeforeExpiration = expiration.minus(15, MINUTES);
- if (now.isBefore(fifteenMinutesBeforeExpiration)) {
- return now.plus(10, MINUTES);
- }
-
- // If expiration time is 0.25-15 minutes from now, refresh in 5 minutes, or 15 seconds before expiration, whichever is
- // sooner.
- Instant fifteenSecondsBeforeExpiration = expiration.minus(15, SECONDS);
- if (now.isBefore(fifteenSecondsBeforeExpiration)) {
- return minimum(now.plus(5, MINUTES), fifteenSecondsBeforeExpiration);
- }
-
- // These credentials are expired. Try refreshing again in 5 minutes. We can't be more aggressive than that, because we
- // don't want to overload the IMDS endpoint.
- log.warn(() -> "IMDS credential expiration has been extended due to an IMDS availability outage. A refresh "
- + "of these credentials will be attempted again in 5 minutes.");
- return now.plus(5, MINUTES);
+ return now.plus(maximum(timeUntilExpiration.abs().dividedBy(2), Duration.ofMinutes(5)));
}
@Override
diff --git a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/ProcessCredentialsProvider.java b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/ProcessCredentialsProvider.java
index a9f4f8942d24..7c69d1780d8a 100644
--- a/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/ProcessCredentialsProvider.java
+++ b/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/ProcessCredentialsProvider.java
@@ -249,8 +249,9 @@ private Builder(ProcessCredentialsProvider provider) {
}
/**
- * Configure whether the provider should fetch credentials asynchronously in the background. If this is true, threads are
- * less likely to block when credentials are loaded, but additional resources are used to maintain the provider.
+ * Configure whether the provider should fetch credentials asynchronously in the background. If this is true,
+ * threads are less likely to block when credentials are loaded, but additional resources are used to maintain
+ * the provider.
*
* By default, this is disabled.
*/
diff --git a/core/auth/src/test/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProviderTest.java b/core/auth/src/test/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProviderTest.java
index 0578165c17e9..25f06f047400 100644
--- a/core/auth/src/test/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProviderTest.java
+++ b/core/auth/src/test/java/software/amazon/awssdk/auth/credentials/InstanceProfileCredentialsProviderTest.java
@@ -41,6 +41,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
@@ -361,10 +362,10 @@ public void resolveCredentials_callsImdsIfCredentialsWithin5MinutesOfExpiration(
stubCredentialsResponse(aResponse().withBody(successfulCredentialsResponse1));
AwsCredentials credentials24HoursAgo = credentialsProvider.resolveCredentials();
- // Set the time to 3 minutes before expiration, and fail to call IMDS
- clock.time = now.minus(3, MINUTES);
+ // Set the time to 10 minutes before expiration, and fail to call IMDS
+ clock.time = now.minus(10, MINUTES);
stubCredentialsResponse(aResponse().withStatus(500));
- AwsCredentials credentials3MinutesAgo = credentialsProvider.resolveCredentials();
+ AwsCredentials credentials10MinutesAgo = credentialsProvider.resolveCredentials();
// Set the time to 10 seconds before expiration, and verify that we still call IMDS to try to get credentials in at the
// last moment before expiration
@@ -372,11 +373,47 @@ public void resolveCredentials_callsImdsIfCredentialsWithin5MinutesOfExpiration(
stubCredentialsResponse(aResponse().withBody(successfulCredentialsResponse2));
AwsCredentials credentials10SecondsAgo = credentialsProvider.resolveCredentials();
- assertThat(credentials24HoursAgo).isEqualTo(credentials3MinutesAgo);
+ assertThat(credentials24HoursAgo).isEqualTo(credentials10MinutesAgo);
assertThat(credentials24HoursAgo.secretAccessKey()).isEqualTo("SECRET_ACCESS_KEY");
assertThat(credentials10SecondsAgo.secretAccessKey()).isEqualTo("SECRET_ACCESS_KEY2");
}
+ @Test
+ public void imdsCallFrequencyIsLimited() {
+ // Requires running the test multiple times to account for refresh jitter
+ for (int i = 0; i < 10; i++) {
+ AdjustableClock clock = new AdjustableClock();
+ AwsCredentialsProvider credentialsProvider = credentialsProviderWithClock(clock);
+ Instant now = Instant.now();
+ String successfulCredentialsResponse1 =
+ "{"
+ + "\"AccessKeyId\":\"ACCESS_KEY_ID\","
+ + "\"SecretAccessKey\":\"SECRET_ACCESS_KEY\","
+ + "\"Expiration\":\"" + DateUtils.formatIso8601Date(now) + '"'
+ + "}";
+
+ String successfulCredentialsResponse2 =
+ "{"
+ + "\"AccessKeyId\":\"ACCESS_KEY_ID2\","
+ + "\"SecretAccessKey\":\"SECRET_ACCESS_KEY2\","
+ + "\"Expiration\":\"" + DateUtils.formatIso8601Date(now.plus(6, HOURS)) + '"'
+ + "}";
+
+ // Set the time to 5 minutes before expiration and call IMDS
+ clock.time = now.minus(5, MINUTES);
+ stubCredentialsResponse(aResponse().withBody(successfulCredentialsResponse1));
+ AwsCredentials credentials5MinutesAgo = credentialsProvider.resolveCredentials();
+
+ // Set the time to 1 second before expiration, and verify that do not call IMDS because it hasn't been 5 minutes yet
+ clock.time = now.minus(1, SECONDS);
+ stubCredentialsResponse(aResponse().withBody(successfulCredentialsResponse2));
+ AwsCredentials credentials1SecondsAgo = credentialsProvider.resolveCredentials();
+
+ assertThat(credentials5MinutesAgo).isEqualTo(credentials1SecondsAgo);
+ assertThat(credentials5MinutesAgo.secretAccessKey()).isEqualTo("SECRET_ACCESS_KEY");
+ }
+ }
+
private AwsCredentialsProvider credentialsProviderWithClock(Clock clock) {
InstanceProfileCredentialsProvider.BuilderImpl builder =
(InstanceProfileCredentialsProvider.BuilderImpl) InstanceProfileCredentialsProvider.builder();
diff --git a/services/sso/src/main/java/software/amazon/awssdk/services/sso/auth/SsoCredentialsProvider.java b/services/sso/src/main/java/software/amazon/awssdk/services/sso/auth/SsoCredentialsProvider.java
index 2f19a1fba8db..4b2cff64c71a 100644
--- a/services/sso/src/main/java/software/amazon/awssdk/services/sso/auth/SsoCredentialsProvider.java
+++ b/services/sso/src/main/java/software/amazon/awssdk/services/sso/auth/SsoCredentialsProvider.java
@@ -20,7 +20,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
@@ -38,23 +37,16 @@
import software.amazon.awssdk.utils.cache.RefreshResult;
/**
- *
- * An implementation of {@link AwsCredentialsProvider} that is extended within this package to provide support for
- * periodically updating session credentials. This credential provider maintains a {@link Supplier}
- * for a {@link SsoClient#getRoleCredentials(Consumer)} call to retrieve the credentials needed.
- *
+ * An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link GetRoleCredentialsRequest} to the AWS
+ * Single Sign-On Service to maintain short-lived sessions to use for authentication. These sessions are updated using a single
+ * calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- *
- * While creating the {@link GetRoleCredentialsRequest}, an access token is needed to be resolved from a token file.
- * In default, the token is assumed unexpired, and if it's expired then an {@link ExpiredTokenException} will be thrown.
- * If the users want to change the behavior of this, please implement your own token resolving logic and override the
- * {@link Builder#refreshRequest).
- *
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
*
- *
- * When credentials get close to expiration, this class will attempt to update them asynchronously. If the credentials
- * end up expiring, this class will block all calls to {@link #resolveCredentials()} until the credentials can be updated.
- *
+ * Users of this provider must {@link #close()} it when they are finished using it.
+ *
+ * This is created using {@link SsoCredentialsProvider#builder()}.
*/
@SdkPublicApi
public final class SsoCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable,
@@ -186,7 +178,10 @@ public interface Builder extends CopyableBuilderBy default, this is 5 minutes.
*/
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleCredentialsProvider.java
index e4a04c6bb390..a67ed53b5766 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleCredentialsProvider.java
@@ -30,12 +30,13 @@
/**
* An implementation of {@link AwsCredentialsProvider} that periodically sends an {@link AssumeRoleRequest} to the AWS
- * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated asynchronously
- * in the background as they get close to expiring. If the credentials are not successfully updated asynchronously in the
- * background, calls to {@link #resolveCredentials()} will begin to block in an attempt to update the credentials synchronously.
+ * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a single
+ * calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- * This provider creates a thread in the background to periodically update credentials. If this provider is no longer needed,
- * the background thread can be shut down using {@link #close()}.
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
+ *
+ * Users of this provider must {@link #close()} it when they are finished using it.
*
* This is created using {@link StsAssumeRoleCredentialsProvider#builder()}.
*/
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithSamlCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithSamlCredentialsProvider.java
index 2b5e9404896c..5b80af1b14d6 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithSamlCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithSamlCredentialsProvider.java
@@ -29,14 +29,14 @@
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
/**
- * An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link AssumeRoleWithSamlRequest}
- * to the AWS Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated
- * asynchronously in the background as they get close to expiring. If the credentials are not successfully updated asynchronously
- * in the background, calls to {@link #resolveCredentials()} will begin to block in an attempt to update the credentials
- * synchronously.
+ * An implementation of {@link AwsCredentialsProvider} that periodically sends an {@link AssumeRoleWithSamlRequest} to the AWS
+ * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a single
+ * calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- * This provider creates a thread in the background to periodically update credentials. If this provider is no longer needed,
- * the background thread can be shut down using {@link #close()}.
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
+ *
+ * Users of this provider must {@link #close()} it when they are finished using it.
*
* This is created using {@link StsAssumeRoleWithSamlCredentialsProvider#builder()}.
*/
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithWebIdentityCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithWebIdentityCredentialsProvider.java
index 7673fc421a1a..de467c27e5a0 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithWebIdentityCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsAssumeRoleWithWebIdentityCredentialsProvider.java
@@ -30,16 +30,16 @@
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
/**
- * An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link AssumeRoleWithWebIdentityRequest}
- * to the AWS Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated
- * asynchronously in the background as they get close to expiring. If the credentials are not successfully updated asynchronously
- * in the background, calls to {@link #resolveCredentials()} will begin to block in an attempt to update the credentials
- * synchronously.
+ * An implementation of {@link AwsCredentialsProvider} that periodically sends an {@link AssumeRoleWithWebIdentityRequest} to the
+ * AWS Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a
+ * single calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- * This provider creates a thread in the background to periodically update credentials. If this provider is no longer needed,
- * the background thread can be shut down using {@link #close()}.
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
*
- * This is created using {@link StsAssumeRoleWithWebIdentityCredentialsProvider#builder()}.
+ * Users of this provider must {@link #close()} it when they are finished using it.
+ *
+ * This is created using {@link #builder()}.
*/
@SdkPublicApi
@ThreadSafe
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsCredentialsProvider.java
index cf55bd27ac30..c09aa0c4dc38 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsCredentialsProvider.java
@@ -37,9 +37,13 @@
/**
* An implementation of {@link AwsCredentialsProvider} that is extended within this package to provide support for periodically-
- * updating session credentials. When credentials get close to expiration, this class will attempt to update them asynchronously
- * using {@link #getUpdatedCredentials(StsClient)}. If the credentials end up expiring, this class will block all calls to
- * {@link #resolveCredentials()} until the credentials can be updated.
+ * updating session credentials.
+ *
+ * When credentials get close to expiration, this class will attempt to update them automatically either with a single calling
+ * thread (by default) or asynchronously (if {@link #asyncCredentialUpdateEnabled} is true). If the credentials expire, this
+ * class will block all calls to {@link #resolveCredentials()} until the credentials are updated.
+ *
+ * Users of this provider must {@link #close()} it when they are finished using it.
*/
@ThreadSafe
@SdkInternalApi
@@ -49,12 +53,12 @@ abstract class StsCredentialsProvider implements AwsCredentialsProvider, SdkAuto
private static final Duration DEFAULT_PREFETCH_TIME = Duration.ofMinutes(5);
/**
- * The STS client that should be used for periodically updating the session credentials in the background.
+ * The STS client that should be used for periodically updating the session credentials.
*/
final StsClient stsClient;
/**
- * The session cache that will update the credentials asynchronously in the background when they get close to expiring.
+ * The session cache that handles automatically updating the credentials when they get close to expiring.
*/
private final CachedSupplier sessionCache;
@@ -174,7 +178,7 @@ public B asyncCredentialUpdateEnabled(Boolean asyncCredentialUpdateEnabled) {
/**
* Configure the amount of time, relative to STS token expiration, that the cached credentials are considered
- * stale and should no longer be used. All threads will block until the value is updated.
+ * stale and must be updated. All threads will block until the value is updated.
*
* By default, this is 1 minute.
*/
@@ -186,7 +190,10 @@ public B staleTime(Duration staleTime) {
/**
* Configure the amount of time, relative to STS token expiration, that the cached credentials are considered
- * close to stale and should be updated. See {@link #asyncCredentialUpdateEnabled}.
+ * close to stale and should be updated.
+ *
+ * Prefetch updates will occur between the specified time and the stale time of the provider. Prefetch updates may be
+ * asynchronous. See {@link #asyncCredentialUpdateEnabled}.
*
* By default, this is 5 minutes.
*/
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetFederationTokenCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetFederationTokenCredentialsProvider.java
index c39af85222bc..4406b0ae47f6 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetFederationTokenCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetFederationTokenCredentialsProvider.java
@@ -28,16 +28,16 @@
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
/**
- * An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link GetFederationTokenRequest} to the
- * AWS Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated
- * asynchronously in the background as they get close to expiring. If the credentials are not successfully updated asynchronously
- * in the background, calls to {@link #resolveCredentials()} will begin to block in an attempt to update the credentials
- * synchronously.
+ * An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link GetFederationTokenRequest} to the AWS
+ * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a single
+ * calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- * This provider creates a thread in the background to periodically update credentials. If this provider is no longer needed,
- * the background thread can be shut down using {@link #close()}.
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
*
- * This is created using {@link StsGetFederationTokenCredentialsProvider#builder()}.
+ * Users of this provider must {@link #close()} it when they are finished using it.
+ *
+ * This is created using {@link #builder()}.
*/
@SdkPublicApi
@ThreadSafe
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetSessionTokenCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetSessionTokenCredentialsProvider.java
index ae66d10619aa..ac56c560bef5 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetSessionTokenCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsGetSessionTokenCredentialsProvider.java
@@ -29,14 +29,15 @@
/**
* An implementation of {@link AwsCredentialsProvider} that periodically sends a {@link GetSessionTokenRequest} to the AWS
- * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated asynchronously
- * in the background as they get close to expiring. If the credentials are not successfully updated asynchronously in the
- * background, calls to {@link #resolveCredentials()} will begin to block in an attempt to update the credentials synchronously.
+ * Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a single
+ * calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
*
- * This provider creates a thread in the background to periodically update credentials. If this provider is no longer needed,
- * the background thread can be shut down using {@link #close()}.
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
*
- * This is created using {@link StsGetSessionTokenCredentialsProvider#builder()}.
+ * Users of this provider must {@link #close()} it when they are finished using it.
+ *
+ * This is created using {@link #builder()}.
*/
@SdkPublicApi
@ThreadSafe
diff --git a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsWebIdentityTokenFileCredentialsProvider.java b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsWebIdentityTokenFileCredentialsProvider.java
index 7b80caf6f503..ce98d97f59d0 100644
--- a/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsWebIdentityTokenFileCredentialsProvider.java
+++ b/services/sts/src/main/java/software/amazon/awssdk/services/sts/auth/StsWebIdentityTokenFileCredentialsProvider.java
@@ -31,29 +31,24 @@
import software.amazon.awssdk.services.sts.internal.AssumeRoleWithWebIdentityRequestSupplier;
import software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
import software.amazon.awssdk.services.sts.model.Credentials;
-import software.amazon.awssdk.services.sts.model.IdpCommunicationErrorException;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
/**
- * A credential provider that will read web identity token file path, aws role arn, aws session name from system properties or
- * environment variables and StsClient for using web identity token credentials with STS
- *
- * StsWebIdentityTokenFileCredentialsProvider allows passing of a custom StsClient at the time of instantiation of the provider.
- * The user needs to make sure that this StsClient handles {@link IdpCommunicationErrorException} in retry policy.
- *
- *
- * This Web Identity Token File Credentials Provider extends {@link StsCredentialsProvider} that supports periodically
- * updating session credentials. Refer {@link StsCredentialsProvider} for more details.
- *
- * STWebIdentityTokenFileCredentialsProvider differs from StsWebIdentityTokenFileCredentialsProvider which is in sts package:
- *
- * - This Credentials Provider supports custom StsClient
- * - This Credentials Provider supports periodically updating session credentials. Refer {@link StsCredentialsProvider}
- *
- * If asyncCredentialUpdateEnabled is set to true then this provider creates a thread in the background to periodically update
- * credentials. If this provider is no longer needed, the background thread can be shut down using {@link #close()}.
- * @see StsCredentialsProvider
+ * An implementation of {@link AwsCredentialsProvider} that periodically sends an {@link AssumeRoleWithWebIdentityRequest} to the
+ * AWS Security Token Service to maintain short-lived sessions to use for authentication. These sessions are updated using a
+ * single calling thread (by default) or asynchronously (if {@link Builder#asyncCredentialUpdateEnabled(Boolean)} is set).
+ *
+ * Unlike {@link StsAssumeRoleWithWebIdentityCredentialsProvider}, this reads the web identity information, including AWS role
+ * ARN, AWS session name and the location of a web identity token file from system properties and environment variables. The
+ * web identity token file is expected to contain the web identity token to use with each request.
+ *
+ * If the credentials are not successfully updated before expiration, calls to {@link #resolveCredentials()} will block until
+ * they are updated successfully.
+ *
+ * Users of this provider must {@link #close()} it when they are finished using it.
+ *
+ * This is created using {@link #builder()}.
*/
@SdkPublicApi
public final class StsWebIdentityTokenFileCredentialsProvider
diff --git a/utils/pom.xml b/utils/pom.xml
index 11967c7bdd6e..b620b4637c41 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -94,6 +94,27 @@
reactive-streams-tck
test
+
+ org.apache.logging.log4j
+ log4j-api
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+ org.slf4j
+ jcl-over-slf4j
+ test
+ ${slf4j.version}
+
diff --git a/utils/src/main/java/software/amazon/awssdk/utils/ComparableUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/ComparableUtils.java
index eb9ea323d18a..e872903262ab 100644
--- a/utils/src/main/java/software/amazon/awssdk/utils/ComparableUtils.java
+++ b/utils/src/main/java/software/amazon/awssdk/utils/ComparableUtils.java
@@ -57,4 +57,15 @@ public static > T minimum(T... values) {
return values == null ? null : Stream.of(values).min(Comparable::compareTo).orElse(null);
}
+ /**
+ * Get the maximum value from a list of comparable vales.
+ *
+ * @param values The values from which the maximum should be extracted.
+ * @return The maximum value in the list.
+ */
+ @SafeVarargs
+ public static > T maximum(T... values) {
+ return values == null ? null : Stream.of(values).max(Comparable::compareTo).orElse(null);
+ }
+
}
diff --git a/utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java b/utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java
index 47a928fcabbb..4a57a020969b 100644
--- a/utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java
+++ b/utils/src/main/java/software/amazon/awssdk/utils/cache/CachedSupplier.java
@@ -15,14 +15,18 @@
package software.amazon.awssdk.utils.cache;
+import static java.time.temporal.ChronoUnit.MINUTES;
+
import java.time.Duration;
import java.time.Instant;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkProtectedApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.Validate;
@@ -37,13 +41,18 @@
* This should be created using {@link #builder(Supplier)}.
*/
@SdkProtectedApi
-public final class CachedSupplier implements Supplier, SdkAutoCloseable {
+public class CachedSupplier implements Supplier, SdkAutoCloseable {
/**
* Maximum time to wait for a blocking refresh lock before calling refresh again. This is to rate limit how many times we call
* refresh. In the ideal case, refresh always occurs in a timely fashion and only one thread actually does the refresh.
*/
private static final Duration BLOCKING_REFRESH_MAX_WAIT = Duration.ofSeconds(5);
+ /**
+ * Random instance used for jittering refresh results.
+ */
+ private static final Random JITTER_RANDOM = new Random();
+
/**
* Used as a primitive form of rate limiting for the speed of our refreshes. This will make sure that the backing supplier has
* a period of time to update the value when the {@link RefreshResult#staleTime()} arrives without getting called by every
@@ -62,6 +71,11 @@ public final class CachedSupplier implements Supplier, SdkAutoCloseable {
*/
private final AtomicBoolean prefetchStrategyInitialized = new AtomicBoolean(false);
+ /**
+ * Whether jitter is enabled on the prefetch duration (can be disabled for testing).
+ */
+ private final boolean prefetchJitterEnabled;
+
/**
* The value currently stored in this cache.
*/
@@ -76,8 +90,9 @@ public final class CachedSupplier implements Supplier, SdkAutoCloseable {
private final Supplier> valueSupplier;
private CachedSupplier(Builder builder) {
- this.valueSupplier = Validate.notNull(builder.supplier, "builder.supplier");
+ this.valueSupplier = jitteredValueSupplier(Validate.notNull(builder.supplier, "builder.supplier"));
this.prefetchStrategy = Validate.notNull(builder.prefetchStrategy, "builder.prefetchStrategy");
+ this.prefetchJitterEnabled = Validate.notNull(builder.prefetchJitterEnabled, "builder.prefetchJitterEnabled");
}
/**
@@ -107,7 +122,7 @@ private boolean cacheIsStale() {
if (cachedValue.staleTime() == null) {
return false;
}
- return Instant.now().isAfter(cachedValue.staleTime());
+ return !Instant.now().isBefore(cachedValue.staleTime());
}
/**
@@ -118,7 +133,7 @@ private boolean shouldInitiateCachePrefetch() {
if (cachedValue.prefetchTime() == null) {
return false;
}
- return Instant.now().isAfter(cachedValue.prefetchTime());
+ return !Instant.now().isBefore(cachedValue.prefetchTime());
}
/**
@@ -146,7 +161,7 @@ private void refreshCache() {
}
// It wasn't, call the supplier to update it.
- cachedValue = valueSupplier.get();
+ cachedValue = prefetchStrategy.fetch(valueSupplier);
}
} finally {
if (lockAcquired) {
@@ -163,6 +178,49 @@ private void handleInterruptedException(String message, InterruptedException cau
throw new IllegalStateException(message, cause);
}
+ /**
+ * Wrap a value supplier with one that jitters its prefetch time.
+ */
+ private Supplier> jitteredValueSupplier(Supplier> supplier) {
+ return () -> {
+ RefreshResult result = supplier.get();
+
+ if (!prefetchJitterEnabled || result.prefetchTime() == null) {
+ return result;
+ }
+
+ Duration maxJitter = getMaxJitter(result);
+ if (maxJitter.isZero()) {
+ return result;
+ }
+
+ long jitter = Math.abs(JITTER_RANDOM.nextLong() % maxJitter.toMillis());
+ Instant newPrefetchTime = result.prefetchTime().plusMillis(jitter);
+ return RefreshResult.builder(result.value())
+ .prefetchTime(newPrefetchTime)
+ .staleTime(result.staleTime())
+ .build();
+ };
+ }
+
+ private Duration getMaxJitter(RefreshResult result) {
+ Instant staleTime = result.staleTime() != null ? result.staleTime() : Instant.MAX;
+ Instant oneMinuteBeforeStale = staleTime.minus(1, MINUTES);
+ if (!result.prefetchTime().isBefore(oneMinuteBeforeStale)) {
+ return Duration.ZERO;
+ }
+
+ Duration timeBetweenPrefetchAndStale = Duration.between(result.prefetchTime(), oneMinuteBeforeStale);
+ if (timeBetweenPrefetchAndStale.toDays() > 365) {
+ // The value will essentially never become stale. The user is likely using this for a value that should be
+ // periodically refreshed on a best-effort basis. Use a 5-minute jitter range to respect their requested
+ // prefetch time.
+ return Duration.ofMinutes(5);
+ }
+
+ return timeBetweenPrefetchAndStale;
+ }
+
/**
* Free any resources consumed by the prefetch strategy this supplier is using.
*/
@@ -177,6 +235,7 @@ public void close() {
public static final class Builder {
private final Supplier> supplier;
private PrefetchStrategy prefetchStrategy = new OneCallerBlocks();
+ private Boolean prefetchJitterEnabled = true;
private Builder(Supplier> supplier) {
this.supplier = supplier;
@@ -194,6 +253,15 @@ public Builder prefetchStrategy(PrefetchStrategy prefetchStrategy) {
return this;
}
+ /**
+ * Whether jitter is enabled on the prefetch time. Can be disabled for testing.
+ */
+ @SdkTestInternalApi
+ Builder prefetchJitterEnabled(Boolean prefetchJitterEnabled) {
+ this.prefetchJitterEnabled = prefetchJitterEnabled;
+ return this;
+ }
+
/**
* Create a {@link CachedSupplier} using the current configuration of this builder.
*/
@@ -215,6 +283,14 @@ public interface PrefetchStrategy extends SdkAutoCloseable {
*/
void prefetch(Runnable valueUpdater);
+ /**
+ * Invoke the provided supplier to retrieve the refresh result. This is useful for prefetch strategies to override when
+ * they care about the refresh result.
+ */
+ default RefreshResult fetch(Supplier> supplier) {
+ return supplier.get();
+ }
+
/**
* Invoked when the prefetch strategy is registered with a {@link CachedSupplier}.
*/
diff --git a/utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java b/utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java
index ecf85be00041..fbd8cc8fc116 100644
--- a/utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java
+++ b/utils/src/main/java/software/amazon/awssdk/utils/cache/NonBlocking.java
@@ -15,17 +15,22 @@
package software.amazon.awssdk.utils.cache;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import java.time.Duration;
-import java.util.concurrent.ScheduledExecutorService;
+import java.time.Instant;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
-import software.amazon.awssdk.utils.Validate;
/**
* A {@link CachedSupplier.PrefetchStrategy} that will run a single thread in the background to update the value. A call to
@@ -37,80 +42,196 @@
public class NonBlocking implements CachedSupplier.PrefetchStrategy {
private static final Logger log = Logger.loggerFor(NonBlocking.class);
+ /**
+ * The maximum number of concurrent refreshes before we start logging warnings and skipping refreshes.
+ */
+ private static final int MAX_CONCURRENT_REFRESHES = 100;
+
+ /**
+ * The semaphore around concurrent background refreshes, enforcing the {@link #MAX_CONCURRENT_REFRESHES}.
+ */
+ private static final Semaphore CONCURRENT_REFRESH_LEASES = new Semaphore(MAX_CONCURRENT_REFRESHES);
+
+ /**
+ * Thread used to kick off refreshes during the prefetch window. This does not do the actual refreshing. That's left for
+ * the {@link #EXECUTOR}.
+ */
+ private static final ScheduledThreadPoolExecutor SCHEDULER =
+ new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().threadNamePrefix("sdk-cache-scheduler")
+ .daemonThreads(true)
+ .build());
+
+ /**
+ * Threads used to do the actual work of refreshing the values (because the cached supplier might block, so we don't
+ * want the work to be done by a small thread pool). This executor is created as unbounded, but we start complaining and
+ * skipping refreshes when there are more than {@link #MAX_CONCURRENT_REFRESHES} running.
+ */
+ private static final ThreadPoolExecutor EXECUTOR =
+ new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ThreadFactoryBuilder().threadNamePrefix("sdk-cache")
+ .daemonThreads(true)
+ .build());
+
+ /**
+ * An incrementing number, used to uniquely identify an instance of NonBlocking in the {@link #asyncThreadName}.
+ */
+ private static final AtomicLong INSTANCE_NUMBER = new AtomicLong(0);
+
/**
* Whether we are currently refreshing the supplier. This is used to make sure only one caller is blocking at a time.
*/
- private final AtomicBoolean currentlyRefreshing = new AtomicBoolean(false);
+ private final AtomicBoolean currentlyPrefetching = new AtomicBoolean(false);
+
+ /**
+ * Name of the thread refreshing the cache for this strategy.
+ */
+ private final String asyncThreadName;
+
+ /**
+ * The refresh task currently scheduled for this non-blocking instance. We ensure that no more than one task is scheduled
+ * per instance.
+ */
+ private final AtomicReference> refreshTask = new AtomicReference<>();
/**
- * How frequently to automatically refresh the supplier in the background.
+ * Whether this strategy has been shutdown (and should stop doing background refreshes)
*/
- private final Duration asyncRefreshFrequency;
+ private volatile boolean shutdown = false;
/**
- * Single threaded executor to asynchronous refresh the value.
+ * The cached supplier using this non-blocking instance.
*/
- private final ScheduledExecutorService executor;
+ private volatile CachedSupplier> cachedSupplier;
+
+ static {
+ // Ensure that cancelling a task actually removes it from the queue.
+ SCHEDULER.setRemoveOnCancelPolicy(true);
+ }
/**
* Create a non-blocking prefetch strategy that uses the provided value for the name of the background thread that will be
* performing the update.
*/
public NonBlocking(String asyncThreadName) {
- this(asyncThreadName, Duration.ofMinutes(1));
+ this.asyncThreadName = asyncThreadName + "-" + INSTANCE_NUMBER.getAndIncrement();
}
@SdkTestInternalApi
- NonBlocking(String asyncThreadName, Duration asyncRefreshFrequency) {
- this.executor = newExecutor(asyncThreadName);
- this.asyncRefreshFrequency = asyncRefreshFrequency;
+ static ThreadPoolExecutor executor() {
+ return EXECUTOR;
}
- private static ScheduledExecutorService newExecutor(String asyncThreadName) {
- Validate.paramNotBlank(asyncThreadName, "asyncThreadName");
- return new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().daemonThreads(true)
- .threadNamePrefix(asyncThreadName)
- .build());
+ @Override
+ public void initializeCachedSupplier(CachedSupplier> cachedSupplier) {
+ this.cachedSupplier = cachedSupplier;
}
@Override
- public void initializeCachedSupplier(CachedSupplier> cachedSupplier) {
- scheduleRefresh(cachedSupplier);
+ public void prefetch(Runnable valueUpdater) {
+ // Only run one async prefetch at a time.
+ if (currentlyPrefetching.compareAndSet(false, true)) {
+ tryRunBackgroundTask(valueUpdater, () -> currentlyPrefetching.set(false));
+ }
}
- private void scheduleRefresh(CachedSupplier> cachedSupplier) {
- executor.schedule(() -> {
- try {
- cachedSupplier.get();
- } finally {
- scheduleRefresh(cachedSupplier);
- }
- }, asyncRefreshFrequency.toMillis(), MILLISECONDS);
+ @Override
+ public RefreshResult fetch(Supplier> supplier) {
+ RefreshResult result = supplier.get();
+ schedulePrefetch(result);
+ return result;
+ }
+
+ private void schedulePrefetch(RefreshResult> result) {
+ if (shutdown || result.staleTime() == null || result.prefetchTime() == null) {
+ return;
+ }
+
+ Duration timeUntilPrefetch = Duration.between(Instant.now(), result.prefetchTime());
+ if (timeUntilPrefetch.isNegative() || timeUntilPrefetch.toDays() > 7) {
+ log.debug(() -> "Skipping background refresh because the prefetch time is in the past or too far in the future: " +
+ result.prefetchTime());
+ return;
+ }
+
+ Instant backgroundRefreshTime = result.prefetchTime().plusSeconds(1);
+ Duration timeUntilBackgroundRefresh = timeUntilPrefetch.plusSeconds(1);
+
+ log.debug(() -> "Scheduling refresh attempt for " + backgroundRefreshTime + " (in " +
+ timeUntilBackgroundRefresh.toMillis() + " ms)");
+
+ ScheduledFuture> scheduledTask = SCHEDULER.schedule(() -> {
+ runWithInstanceThreadName(() -> {
+ log.debug(() -> "Executing refresh attempt scheduled for " + backgroundRefreshTime);
+
+ // If the supplier has already been prefetched, this will just be a cache hit.
+ tryRunBackgroundTask(cachedSupplier::get);
+ });
+ }, timeUntilBackgroundRefresh.toMillis(), TimeUnit.MILLISECONDS);
+
+ updateTask(scheduledTask);
+
+ if (shutdown) {
+ updateTask(null);
+ }
}
@Override
- public void prefetch(Runnable valueUpdater) {
- // Only run one async refresh at a time.
- if (currentlyRefreshing.compareAndSet(false, true)) {
- try {
- executor.submit(() -> {
+ public void close() {
+ shutdown = true;
+ updateTask(null);
+ }
+
+ public void updateTask(ScheduledFuture> newTask) {
+ ScheduledFuture> currentTask;
+ do {
+ currentTask = refreshTask.get();
+ if (currentTask != null && !currentTask.isDone()) {
+ currentTask.cancel(false);
+ }
+ } while (!refreshTask.compareAndSet(currentTask, newTask));
+ }
+
+ public void tryRunBackgroundTask(Runnable runnable) {
+ tryRunBackgroundTask(runnable, () -> {
+ });
+ }
+
+ public void tryRunBackgroundTask(Runnable runnable, Runnable runOnCompletion) {
+ if (!CONCURRENT_REFRESH_LEASES.tryAcquire()) {
+ log.warn(() -> "Skipping a background refresh task because there are too many other tasks running.");
+ runOnCompletion.run();
+ return;
+ }
+
+ try {
+ EXECUTOR.submit(() -> {
+ runWithInstanceThreadName(() -> {
try {
- valueUpdater.run();
- } catch (RuntimeException e) {
- log.warn(() -> "Exception occurred in AWS SDK background task.", e);
+ runnable.run();
+ } catch (Throwable t) {
+ log.warn(() -> "Exception occurred in AWS SDK background task.", t);
} finally {
- currentlyRefreshing.set(false);
+ CONCURRENT_REFRESH_LEASES.release();
+ runOnCompletion.run();
}
});
- } catch (Throwable e) {
- currentlyRefreshing.set(false);
- throw e;
- }
+ });
+ } catch (Throwable t) {
+ log.warn(() -> "Exception occurred when submitting AWS SDK background task.", t);
+ CONCURRENT_REFRESH_LEASES.release();
+ runOnCompletion.run();
}
}
- @Override
- public void close() {
- executor.shutdown();
+ public void runWithInstanceThreadName(Runnable runnable) {
+ String baseThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(baseThreadName + "-" + asyncThreadName);
+ runnable.run();
+ } finally {
+ Thread.currentThread().setName(baseThreadName);
+ }
}
}
diff --git a/utils/src/test/java/software/amazon/awssdk/utils/cache/CachedSupplierTest.java b/utils/src/test/java/software/amazon/awssdk/utils/cache/CachedSupplierTest.java
index ba5153cb6696..7b589c89a7a9 100644
--- a/utils/src/test/java/software/amazon/awssdk/utils/cache/CachedSupplierTest.java
+++ b/utils/src/test/java/software/amazon/awssdk/utils/cache/CachedSupplierTest.java
@@ -22,7 +22,6 @@
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
import java.io.Closeable;
-import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
@@ -198,6 +197,7 @@ public void oneCallerBlocksPrefetchStrategyWorks() throws InterruptedException {
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past())) {
CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
.prefetchStrategy(new OneCallerBlocks())
+ .prefetchJitterEnabled(false)
.build();
// Perform one successful "get" to prime the cache.
@@ -225,6 +225,7 @@ public void nonBlockingPrefetchStrategyWorks() {
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past());
CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
.prefetchStrategy(new NonBlocking("test-%s"))
+ .prefetchJitterEnabled(false)
.build()) {
// Perform one successful "get" to prime the cache.
waitingSupplier.permits.release(1);
@@ -243,10 +244,10 @@ public void nonBlockingPrefetchStrategyWorks() {
@Test
public void nonBlockingPrefetchStrategyRefreshesInBackground() {
- try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past());
+ try (WaitingSupplier waitingSupplier = new WaitingSupplier(now().plusSeconds(62), now().plusSeconds(1));
CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
- .prefetchStrategy(new NonBlocking("test-%s",
- Duration.ofSeconds(1)))
+ .prefetchStrategy(new NonBlocking("test-%s"))
+ .prefetchJitterEnabled(false)
.build()) {
waitingSupplier.permits.release(2);
cachedSupplier.get();
@@ -258,12 +259,25 @@ public void nonBlockingPrefetchStrategyRefreshesInBackground() {
}
}
+ @Test
+ public void nonBlockingPrefetchStrategyHasOneMinuteMinimumByDefault() {
+ try (WaitingSupplier waitingSupplier = new WaitingSupplier(now(), now());
+ CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
+ .prefetchStrategy(new NonBlocking("test-%s"))
+ .build()) {
+ waitingSupplier.permits.release(2);
+ cachedSupplier.get();
+
+ // Ensure two "get"s happens even though we only made one call to the cached supplier.
+ assertThat(invokeSafely(() -> waitingSupplier.startedGetPermits.tryAcquire(2, 2, TimeUnit.SECONDS))).isFalse();
+ }
+ }
+
@Test
public void nonBlockingPrefetchStrategyBackgroundRefreshesHitCache() throws InterruptedException {
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), future());
CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
- .prefetchStrategy(new NonBlocking("test-%s",
- Duration.ofMillis(1)))
+ .prefetchStrategy(new NonBlocking("test-%s"))
.build()) {
waitingSupplier.permits.release(5);
cachedSupplier.get();
@@ -278,8 +292,7 @@ public void nonBlockingPrefetchStrategyBackgroundRefreshesHitCache() throws Inte
public void nonBlockingPrefetchStrategyDoesNotRefreshUntilItIsCalled() throws InterruptedException {
try (WaitingSupplier waitingSupplier = new WaitingSupplier(future(), past());
CachedSupplier cachedSupplier = CachedSupplier.builder(waitingSupplier)
- .prefetchStrategy(new NonBlocking("test-%s",
- Duration.ofMillis(1)))
+ .prefetchStrategy(new NonBlocking("test-%s"))
.build()) {
waitingSupplier.startedGetPermits.release();
@@ -289,6 +302,77 @@ public void nonBlockingPrefetchStrategyDoesNotRefreshUntilItIsCalled() throws In
}
}
+ @Test
+ public void threadsAreSharedBetweenNonBlockingInstances() throws InterruptedException {
+ List> css = new ArrayList<>();
+ try {
+ // Create 99 concurrent non-blocking instances
+ for (int i = 0; i < 99; i++) {
+ CachedSupplier supplier =
+ CachedSupplier.builder(() -> RefreshResult.builder("foo")
+ .prefetchTime(now().plusMillis(1))
+ .staleTime(future())
+ .build())
+ .prefetchStrategy(new NonBlocking("test"))
+ .prefetchJitterEnabled(false)
+ .build();
+ supplier.get();
+ css.add(supplier);
+ }
+
+ int maxActive = 0;
+ for (int i = 0; i < 1000; i++) {
+ maxActive = Math.max(maxActive, NonBlocking.executor().getActiveCount());
+ Thread.sleep(1);
+ }
+
+ // Make sure we used less-than 99 to do the refreshes.
+ assertThat(maxActive).isBetween(1, 99);
+ } finally {
+ css.forEach(CachedSupplier::close);
+ }
+ }
+
+ @Test
+ public void activeThreadsHaveMaxCount() throws InterruptedException {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List> css = new ArrayList<>();
+ try {
+ // Create 99 concurrent non-blocking instances
+ for (int i = 0; i < 1000; i++) {
+ CachedSupplier supplier =
+ CachedSupplier.builder(() -> {
+ invokeSafely(() -> Thread.sleep(100));
+ return RefreshResult.builder("foo")
+ .prefetchTime(now().plusMillis(1))
+ .staleTime(now().plusSeconds(60))
+ .build();
+ }).prefetchStrategy(new NonBlocking("test"))
+ .prefetchJitterEnabled(false)
+ .build();
+ executor.submit(supplier::get);
+ css.add(supplier);
+ }
+
+ executor.shutdown();
+ assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
+
+ int maxActive = 0;
+ for (int i = 0; i < 1000; i++) {
+ maxActive = Math.max(maxActive, NonBlocking.executor().getActiveCount());
+ Thread.sleep(1);
+ }
+
+ // In a perfect world this would be capped to 100, but the mechanism we use to limit concurrent refreshes usually
+ // means more than 100 can get created. 150 should be a reasonable limit to check for, because without the limiter
+ // it would be ~1000.
+ assertThat(maxActive).isBetween(2, 150);
+ } finally {
+ css.forEach(CachedSupplier::close);
+ executor.shutdownNow();
+ }
+ }
+
/**
* Asynchronously perform a "get" on the provided supplier, returning the future that will be completed when the "get"
* finishes.
diff --git a/utils/src/test/resources/log4j2.properties b/utils/src/test/resources/log4j2.properties
new file mode 100644
index 000000000000..bcd864aa6982
--- /dev/null
+++ b/utils/src/test/resources/log4j2.properties
@@ -0,0 +1,24 @@
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License").
+# You may not use this file except in compliance with the License.
+# A copy of the License is located at
+#
+# http://aws.amazon.com/apache2.0
+#
+# or in the "license" file accompanying this file. This file is distributed
+# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+# express or implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+status = warn
+
+appender.console.type = Console
+appender.console.name = ConsoleAppender
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n%throwable
+
+rootLogger.level = error
+rootLogger.appenderRef.stdout.ref = ConsoleAppender