From 8156055810ae5a8a572cd7650acfb44ab8d4e7a8 Mon Sep 17 00:00:00 2001 From: Abhi Gupta Date: Wed, 16 Jul 2025 14:55:15 +0530 Subject: [PATCH] Release 2.0.1 of the DynamoDB Streams Kinesis Adapter --- CHANGELOG.md | 8 +- README.md | 2 +- pom.xml | 85 +++++- .../AmazonDynamoDBStreamsAdapterClient.java | 46 ++- .../DynamoDBStreamsDataFetcher.java | 103 ++++++- .../DynamoDBStreamsShardDetector.java | 27 +- .../DynamoDBStreamsShardSyncer.java | 8 +- ...amoDBStreamsGetRecordsResponseAdapter.java | 11 +- .../DynamoDBStreamsRequestsBuilder.java | 46 +++ .../util/KinesisMapperUtil.java | 15 +- ...mazonDynamoDBStreamsAdapterClientTest.java | 87 ++++++ .../DynamoDBStreamsDataFetcherTest.java | 275 +++++++++++++++++- .../DynamoDBStreamsShardSyncerTest.java | 55 ++-- .../util/KinesisMapperUtilTest.java | 16 + 14 files changed, 730 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 167264d..423f2ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ -### Latest Release (v2.0.0) +### Latest Release (v2.0.1) +* Added support for optional shard filter parameter in DescribeStream api that allows customers to fetch child shards of a read_only parent shard. +* Fixes the [bug](https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/62) where dynamodb-streams-kinesis-adapter was not constructing stream arn properly for china aws partition. +* Bump aws sdk to 2.32.0 +### Release (v2.0.0) * Major version upgrade to support Amazon Kinesis Client Library (KCL) version 3.x * Upgrades AWS Java SDK to version 2.x, removing dependency on AWS SDK v1 * Provides a custom StreamsSchedulerFactory for creating a KCL scheduler optimized for DynamoDB Streams @@ -70,4 +74,4 @@ To fix high propagation delay problems, opt-into using DynamoDBStreamsProxy (ins adapterClient, amazonDynamoDB, amazonCloudWatchClient); -``` +``` \ No newline at end of file diff --git a/README.md b/README.md index d91ccd0..d5a60d6 100644 --- a/README.md +++ b/README.md @@ -60,4 +60,4 @@ See [CHANGELOG.md](CHANGELOG.md) [6]: https://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html [7]: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html [8]: http://mvnrepository.com/artifact/com.amazonaws/dynamodb-streams-kinesis-adapter -[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html +[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html \ No newline at end of file diff --git a/pom.xml b/pom.xml index 627b947..3677de6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ dynamodb-streams-kinesis-adapter jar DynamoDB Streams Adapter for Java - 2.0.0 + 2.0.1 The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream. https://aws.amazon.com/dynamodb @@ -23,10 +23,10 @@ - 3.1.0 + 3.1.1 true 3.0.0 - 2.26.19 + 2.32.0 @@ -242,6 +242,37 @@ + + org.apache.maven.plugins + maven-javadoc-plugin + 3.7.0 + + + attach-javadocs + + jar + + + + + + -Xdoclint:none + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar + + + + @@ -254,4 +285,50 @@ - + + + + central + https://ossrh-staging-api.central.sonatype.com/content/repositories/snapshots + + + central + https://ossrh-staging-api.central.sonatype.com/service/local/staging/deploy/maven2/ + + + + + + publishing + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.2.1 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.central + central-publishing-maven-plugin + 0.7.0 + true + + central + false + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java index 33ea876..8dda2c6 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java @@ -25,7 +25,10 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ApiName; +import software.amazon.awssdk.core.RequestOverrideConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; @@ -34,6 +37,7 @@ import software.amazon.awssdk.retries.api.BackoffStrategy; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -46,8 +50,10 @@ import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; + import java.time.Duration; +import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CompletableFuture; @Slf4j @@ -71,6 +77,7 @@ public enum SkipRecordsBehavior { private SkipRecordsBehavior skipRecordsBehavior = SkipRecordsBehavior.SKIP_RECORDS_TO_TRIM_HORIZON; private static final int MAX_DESCRIBE_STREAM_RETRY_ATTEMPTS = 50; private static final Duration DESCRIBE_STREAM_CALLS_DELAY = Duration.ofMillis(1000); + private static final String KCL_CONSUMER_ID_PREFIX = "KCL-ConsumerId"; private Region region; @@ -157,9 +164,10 @@ public void close() { public CompletableFuture describeStream( software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest describeStreamRequest) throws AwsServiceException, SdkClientException { + String consumerId = getConsumerId(describeStreamRequest.overrideConfiguration()); return CompletableFuture.supplyAsync(() -> { DescribeStreamRequest ddbDescribeStreamRequest = - DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder() + DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId) .streamArn(describeStreamRequest.streamName()) .limit(describeStreamRequest.limit()) .exclusiveStartShardId(describeStreamRequest.exclusiveStartShardId()) @@ -175,6 +183,17 @@ public CompletableFuture describeStream( }); } + private String getConsumerId(Optional overrideConfiguration) { + return overrideConfiguration + .map(RequestOverrideConfiguration::apiNames) + .orElse(Collections.emptyList()) + .stream() + .map(ApiName::name) + .filter(name -> name.contains(KCL_CONSUMER_ID_PREFIX)) + .findFirst() + .orElse(""); + } + /** * Gets a shard iterator using the provided request from DynamoDB Streams. * @param getShardIteratorRequest Container for the necessary parameters to execute the GetShardIterator service @@ -189,8 +208,9 @@ public CompletableFuture getShardIterator(GetShardIter private GetShardIteratorResponse getShardIteratorResponse(GetShardIteratorRequest getShardIteratorRequest) throws AwsServiceException, SdkClientException { + String consumerId = getConsumerId(getShardIteratorRequest.overrideConfiguration()); software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest ddbGetShardIteratorRequest = - DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder() + DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder(consumerId) .streamArn(getShardIteratorRequest.streamName()) .shardIteratorType(getShardIteratorRequest.shardIteratorTypeAsString()) .shardId(getShardIteratorRequest.shardId()) @@ -271,7 +291,7 @@ public CompletableFuture getRecords(GetRecordsRequest getRec + " See getDynamoDBStreamsRecords function"); } - public CompletableFuture getDynamoDBStreamsRecords( + public CompletableFuture getDynamoDBStreamsRecords( software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest ddbGetRecordsRequest ) throws AwsServiceException, SdkClientException { return CompletableFuture.supplyAsync(() -> { @@ -286,6 +306,24 @@ public CompletableFuture getDynamoDBStreamsRecords( }); } + public DescribeStreamResponse describeStreamWithFilter(String streamArn, ShardFilter shardFilter, + String consumerId) { + DescribeStreamRequest describeStreamRequest = + DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId) + .streamArn(streamArn) + .shardFilter(shardFilter) + .build(); + software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse describeStreamResponse; + try { + describeStreamResponse = internalClient.describeStream(describeStreamRequest); + } catch (AwsServiceException e) { + throw AmazonServiceExceptionTransformer.transformDynamoDBStreamsToKinesisDescribeStream(e); + } + return KinesisMapperUtil.convertDynamoDBDescribeStreamResponseToKinesisDescribeStreamResponse( + describeStreamResponse + ); + } + /** * Sets a value of {@link SkipRecordsBehavior} to decide how the application handles the case when records are lost. * Default = {@link SkipRecordsBehavior#SKIP_RECORDS_TO_TRIM_HORIZON} diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcher.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcher.java index 4f6788d..ee9ffa8 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcher.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcher.java @@ -19,6 +19,7 @@ import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsGetRecordsResponseAdapter; import com.amazonaws.services.dynamodbv2.streamsadapter.common.DynamoDBStreamsRequestsBuilder; import com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import lombok.AccessLevel; import lombok.Data; @@ -28,12 +29,20 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.KinesisException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -46,14 +55,18 @@ import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; +import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.polling.DataFetcher; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Implements fetching data from DynamoDB Streams using GetRecords and GetShardIterator API. @@ -63,6 +76,10 @@ public class DynamoDBStreamsDataFetcher implements DataFetcher { private static final String METRICS_PREFIX = "DynamoDBStreamsDataFetcher"; private static final String OPERATION = "ProcessTask"; + protected static final int MAX_DESCRIBE_STREAM_ATTEMPTS_FOR_CHILD_SHARD_DISCOVERY_ON_NO_RECORDS = 10; + private static final int DESCRIBE_STREAM_FOR_CHILD_SHARD_DISCOVERY_BACKOFF_ON_NO_RECORDS_MAX_DELAY_IN_MILLIS = 1000; + private static final int DESCRIBE_STREAM_FOR_CHILD_SHARD_DISCOVERY_BACKOFF_ON_NO_RECORDS_BASE_DELAY_IN_MILLIS = 50; + @NonNull private final AmazonDynamoDBStreamsAdapterClient amazonDynamoDBStreamsAdapterClient; @@ -94,6 +111,8 @@ public class DynamoDBStreamsDataFetcher implements DataFetcher { private InitialPositionInStreamExtended initialPositionInStream; + private String consumerId; + /** * Reusable {@link AWSExceptionManager}. *

@@ -158,6 +177,7 @@ public DynamoDBStreamsDataFetcher(@NotNull AmazonDynamoDBStreamsAdapterClient am KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(streamIdentifier.streamName()), shardId); this.maxFutureWait = dynamoDBStreamsDataFetcherProviderConfig.getKinesisRequestTimeout(); + this.consumerId = dynamoDBStreamsDataFetcherProviderConfig.consumerId(); } /** @@ -224,7 +244,13 @@ public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExte GetShardIteratorRequest.Builder getShardIteratorRequestBuilder = GetShardIteratorRequest .builder() .streamName(createDynamoDBStreamsArnFromKinesisStreamName(streamIdentifier.streamName())) - .shardId(shardId); + .shardId(shardId) + .overrideConfiguration(AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder() + .name(consumerId) + .version(RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT_VERSION) + .build()) + .build()); if (Objects.equals(ExtendedSequenceNumber.LATEST.sequenceNumber(), sequenceNumber)) { getShardIteratorRequestBuilder.shardIteratorType(ShardIteratorType.LATEST); @@ -327,7 +353,78 @@ public software.amazon.awssdk.services.kinesis.model.GetRecordsRequest getGetRec */ public GetRecordsResponseAdapter getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { - return amazonDynamoDBStreamsAdapterClient.getDynamoDBStreamsRecords(request).get(); + DynamoDBStreamsGetRecordsResponseAdapter getRecordsResponseAdapter = + amazonDynamoDBStreamsAdapterClient.getDynamoDBStreamsRecords(request).get(); + + // We have reached the end of the shard, call DescribeStream API with ShardFilter parameter + if (Objects.isNull(getRecordsResponseAdapter.nextShardIterator())) { + DescribeStreamResponse describeStreamResponse = getChildShards(streamIdentifier.streamName(), shardId); + if (describeStreamResponse != null) { + List childShards = describeStreamResponse.streamDescription() + .shards() + .stream() + .map(shard -> ChildShard.builder() + .shardId(shard.shardId()) + .parentShards( + Stream.of(shard.parentShardId(), shard.adjacentParentShardId()) + .filter(Objects::nonNull) + .collect(Collectors.toList())) + .hashKeyRange(shard.hashKeyRange()) + .build() + ) + .collect(Collectors.toList()); + getRecordsResponseAdapter.addChildShards(childShards); + } + } + return getRecordsResponseAdapter; + } + + @VisibleForTesting + protected DescribeStreamResponse getChildShards(String streamName, String shardId) throws InterruptedException { + int attempts = 0; + do { + try { + DescribeStreamResponse describeStreamResponse = amazonDynamoDBStreamsAdapterClient + .describeStreamWithFilter( + createDynamoDBStreamsArnFromKinesisStreamName(streamName), + ShardFilter.builder() + .type(ShardFilterType.CHILD_SHARDS) + .shardId(shardId) + .build(), + consumerId); + + if (!describeStreamResponse.streamDescription().shards().isEmpty()) { + return describeStreamResponse; + } + + // if stream is disabled and no child shards are found, we will not retry for this case. + if (StreamStatus.DISABLED.toString().equals( + describeStreamResponse.streamDescription().streamStatusAsString())) { + return null; + } + } catch (LimitExceededException e) { + log.error("Caught limit exceeded exception while getting child shards for stream and shard: {}", + streamAndShardId, e); + } catch (Exception e) { + // if there is any exception, fall back to paginated DescribeStream call for shard discovery + log.error("Caught exception while getting child shards from stream and shard: {}", + streamAndShardId, e); + return null; + } + attempts++; + // Calculate exponential backoff: 50ms, 100ms, 200ms, 400ms, 800ms, 1000ms, ... + long delayMillis = + Math.min(DESCRIBE_STREAM_FOR_CHILD_SHARD_DISCOVERY_BACKOFF_ON_NO_RECORDS_BASE_DELAY_IN_MILLIS + * (1L << attempts), + DESCRIBE_STREAM_FOR_CHILD_SHARD_DISCOVERY_BACKOFF_ON_NO_RECORDS_MAX_DELAY_IN_MILLIS); + + // Add jitter (±20% of delay) + long jitter = (long) (delayMillis * 0.2 * (Math.random() - 0.5) * 2); + delayMillis += jitter; + Thread.sleep(delayMillis); + } while (attempts < MAX_DESCRIBE_STREAM_ATTEMPTS_FOR_CHILD_SHARD_DISCOVERY_ON_NO_RECORDS); + log.error("Finding child shards for stream and shard: {} failed after {} attempts", streamAndShardId, attempts); + return null; } /** @@ -337,7 +434,7 @@ public GetRecordsResponseAdapter getGetRecordsResponse(GetRecordsRequest request * @return GetRecordsRequest. */ public GetRecordsRequest ddbGetRecordsRequest(String nextIterator) { - return DynamoDBStreamsRequestsBuilder.getRecordsRequestBuilder() + return DynamoDBStreamsRequestsBuilder.getRecordsRequestBuilder(consumerId) .shardIterator(nextIterator) .limit(maxRecords) .build(); diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardDetector.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardDetector.java index 54d82e0..6fcd8ff 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardDetector.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardDetector.java @@ -24,6 +24,8 @@ import lombok.Synchronized; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; @@ -36,6 +38,8 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.retrieval.AWSExceptionManager; +import software.amazon.kinesis.retrieval.RetrievalConfig; + import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -149,7 +153,7 @@ public Shard shard(String shardId) { if (shard == null) { log.info("Too many shard map cache misses for stream: {} or " + "cache is out of date -- forcing a refresh", streamArn); - describeStream(null); + describeStream(null, ""); shard = cachedShardMap.get(shardId); if (shard == null) { @@ -180,12 +184,19 @@ public Shard shard(String shardId) { @Override @Synchronized public List listShards() { - DescribeStreamResult describeStreamResult = describeStream(null); + DescribeStreamResult describeStreamResult = describeStream(null, ""); + return describeStreamResult.getShards(); + } + + @Override + @Synchronized + public List listShards(String consumerId) { + DescribeStreamResult describeStreamResult = describeStream(null, consumerId); return describeStreamResult.getShards(); } @Synchronized - public DescribeStreamResult describeStream(String lastSeenShardId) { + public DescribeStreamResult describeStream(String lastSeenShardId, String consumerId) { ShardGraphTracker shardTracker = new ShardGraphTracker(); String exclusiveStartShardId = lastSeenShardId; DescribeStreamResult describeStreamResult = new DescribeStreamResult(); @@ -193,7 +204,7 @@ public DescribeStreamResult describeStream(String lastSeenShardId) { // Phase 1: Collect all shards from Paginations. do { - describeStreamResponse = describeStreamResponse(exclusiveStartShardId); + describeStreamResponse = describeStreamResponse(exclusiveStartShardId, consumerId); // Collect shards shardTracker.collectShards(describeStreamResponse.streamDescription().shards()); @@ -228,10 +239,16 @@ public DescribeStreamResult describeStream(String lastSeenShardId) { return describeStreamResult; } - private DescribeStreamResponse describeStreamResponse(String exclusiveStartShardId) { + private DescribeStreamResponse describeStreamResponse(String exclusiveStartShardId, String consumerId) { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(this.streamArn) .exclusiveStartShardId(exclusiveStartShardId) + .overrideConfiguration(AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder() + .name(consumerId) + .version(RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT_VERSION) + .build()) + .build()) .build(); DescribeStreamResponse describeStreamResponse; diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java index 431c81e..b81e5af 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java @@ -164,7 +164,8 @@ private void syncShardLeases(@NonNull final ShardDetector shardDetector, throws ProvisionedThroughputException, InvalidStateException, DependencyException { LOG.info("syncShardLeases " + streamArn + ": begin"); long startTimeMillis = System.currentTimeMillis(); - List shards = getShardList(shardDetector); + String consumerId = leaseRefresher.getLeaseTableIdentifier(); + List shards = getShardList(shardDetector, consumerId); LOG.debug("Num shards " + streamArn + ": " + shards.size()); Map shardIdToShardMap = constructShardIdToShardMap(shards); Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap); @@ -209,11 +210,12 @@ private void syncShardLeases(@NonNull final ShardDetector shardDetector, LOG.info("syncShardLeases: " + streamArn + ": end"); } - private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + private List getShardList(@NonNull final ShardDetector shardDetector, String consumerId) + throws KinesisClientLibIOException { // Fallback to existing behavior for backward compatibility List shardList = Collections.emptyList(); try { - shardList = shardDetector.listShards(); + shardList = shardDetector.listShards(consumerId); } catch (ResourceNotFoundException e) { if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) { deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier)); diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/adapter/DynamoDBStreamsGetRecordsResponseAdapter.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/adapter/DynamoDBStreamsGetRecordsResponseAdapter.java index 102b76f..dc26c9e 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/adapter/DynamoDBStreamsGetRecordsResponseAdapter.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/adapter/DynamoDBStreamsGetRecordsResponseAdapter.java @@ -19,16 +19,19 @@ import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.KinesisClientRecord; -import java.util.Collections; + +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public class DynamoDBStreamsGetRecordsResponseAdapter implements GetRecordsResponseAdapter { private final GetRecordsResponse response; + private final List childShards; public DynamoDBStreamsGetRecordsResponseAdapter(GetRecordsResponse response) { this.response = response; + this.childShards = new ArrayList<>(); } @Override @@ -52,7 +55,7 @@ public Long millisBehindLatest() { @Override public List childShards() { - return Collections.emptyList(); + return this.childShards; } @Override @@ -64,4 +67,8 @@ public String nextShardIterator() { public String requestId() { return response.responseMetadata().requestId(); } + + public void addChildShards(List childShards) { + this.childShards.addAll(childShards); + } } diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/common/DynamoDBStreamsRequestsBuilder.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/common/DynamoDBStreamsRequestsBuilder.java index 0ca4cd7..68a2805 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/common/DynamoDBStreamsRequestsBuilder.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/common/DynamoDBStreamsRequestsBuilder.java @@ -45,6 +45,15 @@ public static ListStreamsRequest.Builder listStreamsRequestBuilder() { return appendUserAgent(ListStreamsRequest.builder()); } + /** + * Creates a builder for ListStreams request with user agent information. + * + * @return ListStreamsRequest builder with user agent configuration + */ + public static ListStreamsRequest.Builder listStreamsRequestBuilder(String consumerId) { + return appendUserAgent(ListStreamsRequest.builder(), consumerId); + } + /** * Creates a builder for GetRecords request with user agent information. * @@ -54,6 +63,15 @@ public static GetRecordsRequest.Builder getRecordsRequestBuilder() { return appendUserAgent(GetRecordsRequest.builder()); } + /** + * Creates a builder for GetRecords request with user agent information. + * + * @return GetRecordsRequest builder with user agent configuration + */ + public static GetRecordsRequest.Builder getRecordsRequestBuilder(String consumerId) { + return appendUserAgent(GetRecordsRequest.builder(), consumerId); + } + /** * Creates a builder for GetShardIterator request with user agent information. * @@ -63,6 +81,15 @@ public static GetShardIteratorRequest.Builder getShardIteratorRequestBuilder() { return appendUserAgent(GetShardIteratorRequest.builder()); } + /** + * Creates a builder for GetShardIterator request with user agent information. + * + * @return GetShardIteratorRequest builder with user agent configuration + */ + public static GetShardIteratorRequest.Builder getShardIteratorRequestBuilder(String consumerId) { + return appendUserAgent(GetShardIteratorRequest.builder(), consumerId); + } + /** * Creates a builder for DescribeStream request with user agent information. * @@ -72,6 +99,15 @@ public static DescribeStreamRequest.Builder describeStreamRequestBuilder() { return appendUserAgent(DescribeStreamRequest.builder()); } + /** + * Creates a builder for DescribeStream request with user agent information. + * + * @return DescribeStreamRequest builder with user agent configuration + */ + public static DescribeStreamRequest.Builder describeStreamRequestBuilder(String consumerId) { + return appendUserAgent(DescribeStreamRequest.builder(), consumerId); + } + @SuppressWarnings("unchecked") private static T appendUserAgent(final T builder) { return (T) builder.overrideConfiguration(AwsRequestOverrideConfiguration.builder() @@ -81,4 +117,14 @@ private static T appendUserAgent(final T builder) .build()) .build()); } + + @SuppressWarnings("unchecked") + private static T appendUserAgent(final T builder, String consumerId) { + return (T) builder.overrideConfiguration(AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder() + .name(String.format("%s-%s", consumerId, RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT)) + .version(RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT_VERSION) + .build()) + .build()); + } } diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtil.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtil.java index aa9ef72..0c94ca6 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtil.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtil.java @@ -17,6 +17,7 @@ import com.amazonaws.services.dynamodbv2.streamsadapter.serialization.RecordObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.model.Stream; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; @@ -29,6 +30,8 @@ import java.time.Duration; import java.time.Instant; import java.util.Base64; +import java.util.HashSet; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -51,6 +54,7 @@ private KinesisMapperUtil() {} private static final ObjectMapper MAPPER = new RecordObjectMapper(); private static final String SHARD_ID_SEPARATOR = "-"; + private static Set awsRegions = new HashSet<>(Region.regions()); /** * All the shard-leases should stay retained for at least 6 hours in the lease table. @@ -193,8 +197,15 @@ public static String createDynamoDBStreamsArnFromKinesisStreamName(String stream String accountId = parts[1]; String tableName = parts[2]; String streamLabel = parts[3].replace(COLON_REPLACEMENT, ":"); - String dynamoDBStreamArn = String.format("arn:aws:dynamodb:%s:%s:table/%s/stream/%s", region, accountId, - tableName, streamLabel); + Region awsRegion = Region.of(region); + + if (!awsRegions.contains(awsRegion)) { + throw new IllegalArgumentException("Invalid DynamoDB stream ARN format: " + streamNameToUse); + } + + String arnPartition = awsRegion.metadata().partition().id(); + String dynamoDBStreamArn = String.format("arn:%s:dynamodb:%s:%s:table/%s/stream/%s", + arnPartition, region, accountId, tableName, streamLabel); if (!isValidDynamoDBStreamArn(dynamoDBStreamArn)) { throw new IllegalArgumentException("Invalid DynamoDB stream ARN: " + dynamoDBStreamArn); } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientTest.java index 7627df9..23fcd42 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClientTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -28,6 +29,10 @@ import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest; import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse; +import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import software.amazon.awssdk.services.dynamodb.model.Stream; import software.amazon.awssdk.services.dynamodb.model.StreamDescription; import software.amazon.awssdk.services.dynamodb.model.StreamRecord; @@ -277,4 +282,86 @@ void testClose() { adapterClient.close(); verify(dynamoDbStreamsClient).close(); } + + @Test + void testDescribeStreamWithFilter() { + // Setup + String streamArn = STREAM_ARN; + String shardId = SHARD_ID; + + // Create ShardFilter for child shards + ShardFilter shardFilter = ShardFilter.builder() + .type(ShardFilterType.CHILD_SHARDS) + .shardId(shardId) + .build(); + + // Create DynamoDB response with shards + DescribeStreamResponse dynamoResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamArn(streamArn) + .streamStatus(StreamStatus.ENABLED) + .shards(Arrays.asList( + Shard.builder() + .shardId("child-shard-1") + .parentShardId(shardId) + .sequenceNumberRange(SequenceNumberRange.builder() + .startingSequenceNumber("123") + .endingSequenceNumber("456") + .build()) + .build(), + Shard.builder() + .shardId("child-shard-2") + .parentShardId(shardId) + .sequenceNumberRange(SequenceNumberRange.builder() + .startingSequenceNumber("234") + .endingSequenceNumber("567") + .build()) + .build() + )) + .build()) + .build(); + + // Capture the request to verify filter is passed correctly + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DescribeStreamRequest.class); + when(dynamoDbStreamsClient.describeStream(requestCaptor.capture())) + .thenReturn(dynamoResponse); + + // Execute + software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse response = + adapterClient.describeStreamWithFilter(streamArn, shardFilter, "CONSUMER_ID"); + + // Verify + assertNotNull(response); + assertEquals(streamArn, response.streamDescription().streamName()); + assertEquals("ENABLED", response.streamDescription().streamStatusAsString()); + assertEquals(2, response.streamDescription().shards().size()); + + // Verify the request was constructed correctly + DescribeStreamRequest capturedRequest = requestCaptor.getValue(); + assertEquals(streamArn, capturedRequest.streamArn()); + assertEquals(ShardFilterType.CHILD_SHARDS, capturedRequest.shardFilter().type()); + assertEquals(shardId, capturedRequest.shardFilter().shardId()); + } + + @Test + void testDescribeStreamWithFilterException() { + // Setup + String streamArn = STREAM_ARN; + String shardId = SHARD_ID; + + // Create ShardFilter for child shards + ShardFilter shardFilter = ShardFilter.builder() + .type(ShardFilterType.CHILD_SHARDS) + .shardId(shardId) + .build(); + + // Mock exception from DynamoDB + RuntimeException dynamoException = new RuntimeException("Test exception"); + when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))) + .thenThrow(dynamoException); + + // Execute and verify exception is transformed + assertThrows(RuntimeException.class, () -> + adapterClient.describeStreamWithFilter(streamArn, shardFilter, "CONSUMER_ID")); + } } \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcherTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcherTest.java index 0bbd57b..65ee75e 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsDataFetcherTest.java @@ -17,14 +17,21 @@ import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsGetRecordsResponseAdapter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StreamDescription; @@ -37,14 +44,17 @@ import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; +import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.math.BigInteger; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import static com.amazonaws.services.dynamodbv2.streamsadapter.DynamoDBStreamsDataFetcher.MAX_DESCRIBE_STREAM_ATTEMPTS_FOR_CHILD_SHARD_DISCOVERY_ON_NO_RECORDS; import static com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName; import static com.amazonaws.services.dynamodbv2.streamsadapter.util.KinesisMapperUtil.createKinesisStreamIdentifierFromDynamoDBStreamsArn; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -54,6 +64,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class DynamoDBStreamsDataFetcherTest { @@ -70,13 +83,14 @@ public class DynamoDBStreamsDataFetcherTest { private StreamIdentifier streamIdentifier; private DataFetcherProviderConfig dataFetcherProviderConfig; private MetricsFactory metricsFactory; + public static String CONSUMER_ID = "consumer-id"; @BeforeEach void setup() { amazonDynamoDBStreamsAdapterClient = Mockito.mock(AmazonDynamoDBStreamsAdapterClient.class); streamIdentifier = StreamIdentifier.singleStreamInstance(STREAM_NAME); metricsFactory = new NullMetricsFactory(); - dataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(streamIdentifier, SHARD_ID, metricsFactory, MAX_RECORDS, Duration.ofMillis(30000L)); + dataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(streamIdentifier, SHARD_ID, metricsFactory, MAX_RECORDS, Duration.ofMillis(30000L), CONSUMER_ID); dynamoDBStreamsDataFetcher = new DynamoDBStreamsDataFetcher(amazonDynamoDBStreamsAdapterClient, dataFetcherProviderConfig); } @@ -138,7 +152,7 @@ void testDdbGetRecordsSuccess() { Record record1 = createRecord("1"); Record record2 = createRecord("2"); - GetRecordsResponseAdapter response = new DynamoDBStreamsGetRecordsResponseAdapter( + DynamoDBStreamsGetRecordsResponseAdapter response = new DynamoDBStreamsGetRecordsResponseAdapter( GetRecordsResponse.builder() .records(Arrays.asList(record1, record2)) .nextShardIterator("next-iterator") @@ -225,6 +239,12 @@ void testRestartIteratorWithoutInitialization() { private void mockGetShardIterator(String sequenceNumber, ShardIteratorType iteratorType, String iterator) { when(amazonDynamoDBStreamsAdapterClient.getShardIterator( GetShardIteratorRequest.builder() + .overrideConfiguration(AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder() + .name(CONSUMER_ID) + .version(RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT_VERSION) + .build()) + .build()) .streamName(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)) .shardId(SHARD_ID) .startingSequenceNumber(sequenceNumber) @@ -249,7 +269,6 @@ private Record createRecord(String sequenceNumber) { .build(); } - @Test void testGetDdbGetRecordsResponseWithShardEndAndDisabledStream() throws Exception { // Setup @@ -314,4 +333,254 @@ void testGetDdbGetRecordsResponseWithActiveIterator() throws Exception { assertTrue(result.records().isEmpty()); assertEquals("next-iterator", result.nextShardIterator()); } + + @Test + void testGetChildShardsWithSuccessfulResponse() throws Exception { + // Setup + String shardId = "shard-123"; + + // Create child shards for the response + Shard childShard1 = Shard.builder() + .shardId("child-shard-1") + .parentShardId(shardId) + .build(); + + Shard childShard2 = Shard.builder() + .shardId("child-shard-2") + .parentShardId(shardId) + .build(); + + // Create DescribeStream response with child shards + DescribeStreamResponse describeStreamResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamName(STREAM_NAME) + .streamStatus(StreamStatus.ENABLED.toString()) + .shards(Arrays.asList( + software.amazon.awssdk.services.kinesis.model.Shard.builder() + .shardId(childShard1.shardId()) + .parentShardId(childShard1.parentShardId()) + .hashKeyRange(software.amazon.awssdk.services.kinesis.model.HashKeyRange.builder() + .startingHashKey(BigInteger.ZERO.toString()) + .endingHashKey(BigInteger.ONE.toString()) + .build()) + .build(), + software.amazon.awssdk.services.kinesis.model.Shard.builder() + .shardId(childShard2.shardId()) + .parentShardId(childShard2.parentShardId()) + .hashKeyRange(software.amazon.awssdk.services.kinesis.model.HashKeyRange.builder() + .startingHashKey(BigInteger.ZERO.toString()) + .endingHashKey(BigInteger.ONE.toString()) + .build()) + .build() + )) + .hasMoreShards(false) + .build()) + .build(); + + // Mock the describeStreamWithFilter method + ArgumentCaptor shardFilterCaptor = ArgumentCaptor.forClass(ShardFilter.class); + when(amazonDynamoDBStreamsAdapterClient.describeStreamWithFilter( + Mockito.eq(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)), + shardFilterCaptor.capture(), anyString())) + .thenReturn(describeStreamResponse); + + // Execute + DescribeStreamResponse result = dynamoDBStreamsDataFetcher.getChildShards(STREAM_NAME, shardId); + + // Verify + assertNotNull(result); + assertEquals(2, result.streamDescription().shards().size()); + + // Verify the ShardFilter was correctly constructed + ShardFilter capturedFilter = shardFilterCaptor.getValue(); + assertEquals(ShardFilterType.CHILD_SHARDS, capturedFilter.type()); + assertEquals(shardId, capturedFilter.shardId()); + } + + @Test + void testGetChildShardsWithDisabledStream() throws Exception { + // Setup + String shardId = "shard-123"; + + // Create DescribeStream response for disabled stream with no shards + DescribeStreamResponse describeStreamResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamName(STREAM_NAME) + .streamStatus(StreamStatus.DISABLED.toString()) + .shards(Collections.emptyList()) + .hasMoreShards(false) + .build()) + .build(); + + // Mock the describeStreamWithFilter method + when(amazonDynamoDBStreamsAdapterClient.describeStreamWithFilter( + Mockito.eq(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)), + any(ShardFilter.class), anyString())) + .thenReturn(describeStreamResponse); + + // Execute + DescribeStreamResponse result = dynamoDBStreamsDataFetcher.getChildShards(STREAM_NAME, shardId); + + // Verify + assertNull(result); // Should return null for disabled stream with no child shards + } + + @Test + void testGetChildShardsWithException() throws Exception { + // Setup + String shardId = "shard-123"; + + // Mock the describeStreamWithFilter method to throw an exception + when(amazonDynamoDBStreamsAdapterClient.describeStreamWithFilter( + Mockito.eq(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)), + any(ShardFilter.class), anyString())) + .thenThrow(new RuntimeException("Test exception")); + + // Execute + DescribeStreamResponse result = dynamoDBStreamsDataFetcher.getChildShards(STREAM_NAME, shardId); + + // Verify + assertNull(result); // Should return null when an exception occurs + } + + @Test + void testGetChildShardsWithLimitExceededExceptionExhaustsRetries() throws Exception { + // Setup + String shardId = "shard-123"; + + // Mock the describeStreamWithFilter method to throw an exception + when(amazonDynamoDBStreamsAdapterClient.describeStreamWithFilter( + Mockito.eq(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)), + any(ShardFilter.class), anyString())) + .thenThrow(LimitExceededException.builder().message("LimitExceededException").build()); + + // Execute + DescribeStreamResponse result = dynamoDBStreamsDataFetcher.getChildShards(STREAM_NAME, shardId); + + // Verify + assertNull(result); // Should return null when an exception occurs + verify(amazonDynamoDBStreamsAdapterClient, + times(MAX_DESCRIBE_STREAM_ATTEMPTS_FOR_CHILD_SHARD_DISCOVERY_ON_NO_RECORDS)) + .describeStreamWithFilter(any(), any(), anyString()); + } + + @Test + void testGetChildShardsWithRetries() throws Exception { + // Setup + String shardId = "shard-123"; + + // First call returns empty shards (will trigger retry) + DescribeStreamResponse emptyResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamName(STREAM_NAME) + .streamStatus(StreamStatus.ENABLED.toString()) + .shards(Collections.emptyList()) + .hasMoreShards(false) + .build()) + .build(); + + // Second call returns child shards + DescribeStreamResponse withShardsResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamName(STREAM_NAME) + .streamStatus(StreamStatus.ENABLED.toString()) + .shards(Collections.singletonList( + software.amazon.awssdk.services.kinesis.model.Shard.builder() + .shardId("child-shard-1") + .parentShardId(shardId) + .build() + )) + .hasMoreShards(false) + .build()) + .build(); + + // Mock the describeStreamWithFilter method to return empty response first, then response with shards + when(amazonDynamoDBStreamsAdapterClient.describeStreamWithFilter( + Mockito.eq(createDynamoDBStreamsArnFromKinesisStreamName(STREAM_NAME)), + any(ShardFilter.class), anyString())) + .thenReturn(emptyResponse) + .thenReturn(withShardsResponse); + + // Execute + DescribeStreamResponse result = dynamoDBStreamsDataFetcher.getChildShards(STREAM_NAME, shardId); + + // Verify + assertNotNull(result); + assertEquals(1, result.streamDescription().shards().size()); + assertEquals("child-shard-1", result.streamDescription().shards().get(0).shardId()); + } + + @Test + void testGetDdbGetRecordsResponseWithShardEndAndChildShards() throws Exception { + // Setup + GetRecordsRequest request = GetRecordsRequest.builder() + .shardIterator("some-iterator") + .limit(100) + .build(); + + // Create GetRecordsResponse with null nextShardIterator (indicating shard end) + GetRecordsResponse recordsResponse = GetRecordsResponse.builder() + .records(Collections.emptyList()) + .nextShardIterator(null) + .build(); + + // Create DescribeStream response with child shards + DescribeStreamResponse describeStreamResponse = DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder() + .streamName(STREAM_NAME) + .streamStatus(StreamStatus.ENABLED.toString()) + .shards(Arrays.asList( + software.amazon.awssdk.services.kinesis.model.Shard.builder() + .shardId("child-shard-1") + .parentShardId(SHARD_ID) + .hashKeyRange(software.amazon.awssdk.services.kinesis.model.HashKeyRange.builder() + .startingHashKey("0") + .endingHashKey("499") + .build()) + .build(), + software.amazon.awssdk.services.kinesis.model.Shard.builder() + .shardId("child-shard-2") + .parentShardId(SHARD_ID) + .adjacentParentShardId("adjacent-shard") + .hashKeyRange(software.amazon.awssdk.services.kinesis.model.HashKeyRange.builder() + .startingHashKey("500") + .endingHashKey("999") + .build()) + .build() + )) + .hasMoreShards(false) + .build()) + .build(); + + // Mock responses + when(amazonDynamoDBStreamsAdapterClient.getDynamoDBStreamsRecords(any(GetRecordsRequest.class))) + .thenReturn(CompletableFuture.supplyAsync(() -> new DynamoDBStreamsGetRecordsResponseAdapter(recordsResponse))); + + // Mock the getChildShards method to return the response with child shards + DynamoDBStreamsDataFetcher spyFetcher = Mockito.spy(dynamoDBStreamsDataFetcher); + Mockito.doReturn(describeStreamResponse).when(spyFetcher).getChildShards( + Mockito.eq(STREAM_NAME), Mockito.eq(SHARD_ID)); + + // Execute + GetRecordsResponseAdapter result = spyFetcher.getGetRecordsResponse(request); + + // Verify + assertNotNull(result); + assertTrue(result.records().isEmpty()); + assertNull(result.nextShardIterator()); + + // Verify child shards were added + assertEquals(2, result.childShards().size()); + + // Verify first child shard + assertEquals("child-shard-1", result.childShards().get(0).shardId()); + assertEquals(1, result.childShards().get(0).parentShards().size()); + assertEquals(SHARD_ID, result.childShards().get(0).parentShards().get(0)); + + // Verify second child shard + assertEquals("child-shard-2", result.childShards().get(1).shardId()); + assertEquals(2, result.childShards().get(1).parentShards().size()); + assertEquals(SHARD_ID, result.childShards().get(1).parentShards().get(0)); + assertEquals("adjacent-shard", result.childShards().get(1).parentShards().get(1)); + } } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncerTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncerTest.java index 9e8149d..cc2baec 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncerTest.java @@ -30,6 +30,7 @@ import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.MultiStreamLease; +import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -58,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -85,16 +87,18 @@ public class DynamoDBStreamsShardSyncerTest { private static final StreamIdentifier MULTI_STREAM_IDENTIFIER = StreamIdentifier.multiStreamInstance(MULTI_STREAM_NAME); @BeforeEach - void setup() { + void setup() throws DependencyException { MockitoAnnotations.openMocks(this); shardSyncer = new DynamoDBStreamsShardSyncer(false, SINGLE_STREAM_NAME, true); + when(leaseRefresher.getLeaseTableIdentifier()).thenReturn("CONSUMER_ID"); when(shardDetector.streamIdentifier()).thenReturn(SINGLE_STREAM_IDENTIFIER); } @Test void testCheckAndCreateLeaseForNewShardsWithEmptyShardList() throws Exception { // Setup - when(shardDetector.listShards()).thenReturn(Collections.emptyList()); + when(shardDetector.listShards(anyString())).thenReturn(Collections.emptyList()); + // Execute boolean result = shardSyncer.checkAndCreateLeaseForNewShards( shardDetector, @@ -118,7 +122,7 @@ void testCheckAndCreateLeaseForNewShardsWithSingleShard() throws Exception { List shards = Collections.singletonList(shard); // Mock both listShards and describeStream - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); when(leaseRefresher.listLeases()).thenReturn(Collections.emptyList()); // Execute @@ -171,7 +175,7 @@ void testMultiStreamModeWithSingleStreamHavingSingleShard() throws Exception { List shards = Collections.singletonList(shard); // Mock ShardDetector - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); // Mock empty existing leases @@ -227,12 +231,12 @@ void testMultiStreamModeWithMultipleStreams() throws Exception { // Setup first stream detector DynamoDBStreamsShardDetector detector1 = mock(DynamoDBStreamsShardDetector.class); - when(detector1.listShards()).thenReturn(Collections.singletonList(shard1)); + when(detector1.listShards(anyString())).thenReturn(Collections.singletonList(shard1)); when(detector1.streamIdentifier()).thenReturn(stream1Identifier); // Setup second stream detector DynamoDBStreamsShardDetector detector2 = mock(DynamoDBStreamsShardDetector.class); - when(detector2.listShards()).thenReturn(Collections.singletonList(shard2)); + when(detector2.listShards(anyString())).thenReturn(Collections.singletonList(shard2)); when(detector2.streamIdentifier()).thenReturn(stream2Identifier); // Mock lease refresher for multiple streams @@ -291,7 +295,7 @@ void testCheckpointInitializationForLatestPosition() throws Exception { Shard independentShard = createTestShard(independentShardId, null, null, "0", null); List shards = Arrays.asList(rootShard, childShard, independentShard); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); when(leaseRefresher.listLeases()).thenReturn(Collections.emptyList()); // Test with LATEST position @@ -382,8 +386,8 @@ void testMultiStreamSyncerWithMultipleStreams() throws Exception { DynamoDBStreamsShardDetector mockDetector1 = mock(DynamoDBStreamsShardDetector.class); DynamoDBStreamsShardDetector mockDetector2 = mock(DynamoDBStreamsShardDetector.class); - when(mockDetector1.listShards()).thenReturn(Collections.singletonList(shard1)); - when(mockDetector2.listShards()).thenReturn(Collections.singletonList(shard2)); + when(mockDetector1.listShards(anyString())).thenReturn(Collections.singletonList(shard1)); + when(mockDetector2.listShards(anyString())).thenReturn(Collections.singletonList(shard2)); when(mockDetector1.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance(stream1Identifier)); when(mockDetector2.streamIdentifier()).thenReturn(StreamIdentifier.singleStreamInstance(stream2Identifier)); @@ -392,6 +396,7 @@ void testMultiStreamSyncerWithMultipleStreams() throws Exception { LeaseRefresher mockLeaseRefresher = mock(LeaseRefresher.class); when(mockLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))) .thenReturn(Collections.emptyList()); + when(mockLeaseRefresher.getLeaseTableIdentifier()).thenReturn("CONSUMER_ID"); // Execute for both streams boolean result1 = multiStreamSyncer1.checkAndCreateLeaseForNewShards( @@ -474,7 +479,7 @@ void testLeaseCreationForComplexLineageSingleLease() throws Exception { List shards = Arrays.asList(rootShard, child1, child2, grandchild1, grandchild2, grandchild3, grandchild4, grandchild5); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); when(leaseRefresher.listLeases()).thenReturn(Collections.emptyList()); // Execute @@ -560,7 +565,7 @@ void testLeaseCreationForComplexLineageMultiLease() throws Exception { List shards = Arrays.asList(rootShard, child1, child2, grandchild1, grandchild2, grandchild3, grandchild4, grandchild5); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); when(leaseRefresher.listLeasesForStream(MULTI_STREAM_IDENTIFIER)).thenReturn(Collections.emptyList()); @@ -620,7 +625,7 @@ void testSingleStreamModeWithResourceNotFoundException() throws Exception { true, deletedStreamListProvider ); - when(shardDetector.listShards()).thenThrow(exception); + when(shardDetector.listShards(anyString())).thenThrow(exception); // Execute and verify exception is propagated assertTrue( @@ -632,14 +637,14 @@ void testSingleStreamModeWithResourceNotFoundException() throws Exception { false, true)); - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShards(anyString()); verifyNoInteractions(deletedStreamListProvider); } @Test void testShardSyncThrowsKinesisClientLibExceptionIfShardListIsNull() throws Exception { - when(shardDetector.listShards()).thenReturn(null); + when(shardDetector.listShards(anyString())).thenReturn(null); assertThrows(KinesisClientLibIOException.class, () -> shardSyncer.checkAndCreateLeaseForNewShards( shardDetector, @@ -666,7 +671,7 @@ void testMultiStreamModeWithResourceNotFoundException() throws Exception { .message("Stream not found: " + MULTI_STREAM_NAME) .build(); - when(shardDetector.listShards()).thenThrow(exception); + when(shardDetector.listShards(anyString())).thenThrow(exception); when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); // Execute - should not throw exception in multi-stream mode with DeletedStreamListProvider @@ -681,7 +686,7 @@ void testMultiStreamModeWithResourceNotFoundException() throws Exception { // Verify assertTrue(result); - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShards(anyString()); // Verify the stream was added to the deleted streams list verify(deletedStreamListProvider).add(MULTI_STREAM_IDENTIFIER); @@ -747,7 +752,7 @@ void testLeaseCreationWithExistingLeaseAndAncestorRecursion() throws Exception { when(leaseRefresher.listLeases()).thenReturn(Arrays.asList(existingGrandparentLease, existingChild2Lease)); // Setup shard detector - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); // Execute with TRIM_HORIZON position boolean result = shardSyncer.checkAndCreateLeaseForNewShards( @@ -801,7 +806,7 @@ void testCheckAndCreateLeasesForNewShardsWithInconsistentShardsFailure() { List shards = Arrays.asList(parentShard, childShard); // Mock ShardDetector - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); // Test with ignoreUnexpectedChildShards = false assertThrows(KinesisClientLibIOException.class, () -> @@ -829,7 +834,7 @@ void testCheckAndCreateLeasesForNewShardsWithInconsistentShard() throws Exceptio List shards = Arrays.asList(parentShard, childShard); // Mock ShardDetector - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards(anyString())).thenReturn(shards); // Test with ignoreUnexpectedChildShards = true boolean result = shardSyncer.checkAndCreateLeaseForNewShards( @@ -910,7 +915,7 @@ void testGarbageLeaseCleanupMultiStreamMode() throws Exception { // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(currentStreamId); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeasesForStream(currentStreamId)) .thenReturn(Arrays.asList(currentActiveLease, currentStaleLease)); @@ -998,7 +1003,7 @@ void testMultiStreamLeaseCleanupForClosedShard() throws Exception { // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeasesForStream(MULTI_STREAM_IDENTIFIER)).thenReturn(stream1Leases); DynamoDBStreamsShardSyncer multiStreamSyncer = new DynamoDBStreamsShardSyncer( @@ -1132,7 +1137,7 @@ void testLeaseRetentionPeriodCheckAndDontDelete() throws Exception { // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(streamId); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeasesForStream(streamId)).thenReturn(streamLeases); DynamoDBStreamsShardSyncer multiStreamSyncer = new DynamoDBStreamsShardSyncer( @@ -1266,7 +1271,7 @@ void testLeaseRetentionPeriodCheckAndDeleteGrandparent() throws Exception { // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(SINGLE_STREAM_IDENTIFIER); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeasesForStream(SINGLE_STREAM_IDENTIFIER)).thenReturn(streamLeases); DynamoDBStreamsShardSyncer multiStreamSyncer = new DynamoDBStreamsShardSyncer( @@ -1399,7 +1404,7 @@ void testLeaseRetentionPeriodGrandparentNotDeletedDueToActiveChild() throws Exce // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeasesForStream(MULTI_STREAM_IDENTIFIER)).thenReturn(streamLeases); DynamoDBStreamsShardSyncer multiStreamSyncer = new DynamoDBStreamsShardSyncer( @@ -1487,7 +1492,7 @@ void testNoLeaseCleanupWhenCleanupLeasesOfCompletedShardsIsFalse() throws Except // Setup mocks when(shardDetector.streamIdentifier()).thenReturn(MULTI_STREAM_IDENTIFIER); - when(shardDetector.listShards()).thenReturn(currentShards); + when(shardDetector.listShards(anyString())).thenReturn(currentShards); when(leaseRefresher.listLeases()).thenReturn(currentLeases); diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtilTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtilTest.java index e917a96..402f659 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtilTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/util/KinesisMapperUtilTest.java @@ -116,6 +116,7 @@ void testCreateDynamoDBStreamsArnEdgeCases() { KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(emptyComponents)); } + @Test void testSingleStreamIdentifierConsistency() { // Test that stream identifiers remain consistent through multiple conversions String originalArn = "arn:aws:dynamodb:us-west-2:123456789012:table/TestTable/stream/2024-02-03T00:00:00.000"; @@ -125,6 +126,21 @@ void testSingleStreamIdentifierConsistency() { Assertions.assertEquals(originalArn, reconstructedArn); } + @Test + void testSingleStreamIdentifierConsistencyWithDifferentAwsPartition() { + // Test that stream identifiers remain consistent through multiple conversions + String originalArn = "arn:aws-cn:dynamodb:cn-north-1:123456789012:table/TestTable/stream/2024-02-03T00:00:00.000"; + String streamIdentifier = KinesisMapperUtil.createKinesisStreamIdentifierFromDynamoDBStreamsArn(originalArn, false); + String reconstructedArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(streamIdentifier); + + Assertions.assertEquals(originalArn, reconstructedArn); + + String usGovArn = "arn:aws-us-gov:dynamodb:us-gov-east-1:123456789012:table/TestTable/stream/2024-02-03T00:00:00.000"; + String usGovStreamIdentifier = KinesisMapperUtil.createKinesisStreamIdentifierFromDynamoDBStreamsArn(usGovArn, false); + String usGovReconstructedArn = KinesisMapperUtil.createDynamoDBStreamsArnFromKinesisStreamName(usGovStreamIdentifier); + Assertions.assertEquals(usGovArn, usGovReconstructedArn); + } + @Test void testMultiStreamIdentifierConsistency() { // Test that stream identifiers remain consistent through multiple conversions