Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
### Latest Release (v2.0.0)
### Latest Release (v2.0.1)
* Added support for optional shard filter parameter in DescribeStream api that allows customers to fetch child shards of a read_only parent shard.
* Fixes the [bug](https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/62) where dynamodb-streams-kinesis-adapter was not constructing stream arn properly for china aws partition.
* Bump aws sdk to 2.32.0

### Release (v2.0.0)
* Major version upgrade to support Amazon Kinesis Client Library (KCL) version 3.x
* Upgrades AWS Java SDK to version 2.x, removing dependency on AWS SDK v1
* Provides a custom StreamsSchedulerFactory for creating a KCL scheduler optimized for DynamoDB Streams
Expand Down Expand Up @@ -70,4 +74,4 @@ To fix high propagation delay problems, opt-into using DynamoDBStreamsProxy (ins
adapterClient,
amazonDynamoDB,
amazonCloudWatchClient);
```
```
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ See [CHANGELOG.md](CHANGELOG.md)
[6]: https://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html
[7]: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html
[8]: http://mvnrepository.com/artifact/com.amazonaws/dynamodb-streams-kinesis-adapter
[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
85 changes: 81 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<packaging>jar</packaging>
<name>DynamoDB Streams Adapter for Java</name>
<version>2.0.0</version>
<version>2.0.1</version>
<description>The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.</description>
<url>https://aws.amazon.com/dynamodb</url>

Expand All @@ -23,10 +23,10 @@
</licenses>

<properties>
<software-kinesis.version>3.1.0</software-kinesis.version>
<software-kinesis.version>3.1.1</software-kinesis.version>
<gpg.skip>true</gpg.skip>
<maven.dependency.version>3.0.0</maven.dependency.version>
<awssdk.version>2.26.19</awssdk.version>
<awssdk.version>2.32.0</awssdk.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -242,6 +242,37 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.7.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<additionalOptions>
<additionalOption>-Xdoclint:none</additionalOption>
</additionalOptions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand All @@ -254,4 +285,50 @@
</dependency>
</dependencies>
</dependencyManagement>
</project>

<distributionManagement>
<snapshotRepository>
<id>central</id>
<url>https://ossrh-staging-api.central.sonatype.com/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>central</id>
<url>https://ossrh-staging-api.central.sonatype.com/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

<profiles>
<profile>
<id>publishing</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.central</groupId>
<artifactId>central-publishing-maven-plugin</artifactId>
<version>0.7.0</version>
<extensions>true</extensions>
<configuration>
<publishingServerId>central</publishingServerId>
<autoPublish>false</autoPublish>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ApiName;
import software.amazon.awssdk.core.RequestOverrideConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
Expand All @@ -34,6 +37,7 @@
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
Expand All @@ -46,8 +50,10 @@
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

@Slf4j
Expand All @@ -71,6 +77,7 @@ public enum SkipRecordsBehavior {
private SkipRecordsBehavior skipRecordsBehavior = SkipRecordsBehavior.SKIP_RECORDS_TO_TRIM_HORIZON;
private static final int MAX_DESCRIBE_STREAM_RETRY_ATTEMPTS = 50;
private static final Duration DESCRIBE_STREAM_CALLS_DELAY = Duration.ofMillis(1000);
private static final String KCL_CONSUMER_ID_PREFIX = "KCL-ConsumerId";

private Region region;

Expand Down Expand Up @@ -157,9 +164,10 @@ public void close() {
public CompletableFuture<DescribeStreamResponse> describeStream(
software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest describeStreamRequest)
throws AwsServiceException, SdkClientException {
String consumerId = getConsumerId(describeStreamRequest.overrideConfiguration());
return CompletableFuture.supplyAsync(() -> {
DescribeStreamRequest ddbDescribeStreamRequest =
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder()
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId)
.streamArn(describeStreamRequest.streamName())
.limit(describeStreamRequest.limit())
.exclusiveStartShardId(describeStreamRequest.exclusiveStartShardId())
Expand All @@ -175,6 +183,17 @@ public CompletableFuture<DescribeStreamResponse> describeStream(
});
}

private String getConsumerId(Optional<AwsRequestOverrideConfiguration> overrideConfiguration) {
return overrideConfiguration
.map(RequestOverrideConfiguration::apiNames)
.orElse(Collections.emptyList())
.stream()
.map(ApiName::name)
.filter(name -> name.contains(KCL_CONSUMER_ID_PREFIX))
.findFirst()
.orElse("");
}

/**
* Gets a shard iterator using the provided request from DynamoDB Streams.
* @param getShardIteratorRequest Container for the necessary parameters to execute the GetShardIterator service
Expand All @@ -189,8 +208,9 @@ public CompletableFuture<GetShardIteratorResponse> getShardIterator(GetShardIter

private GetShardIteratorResponse getShardIteratorResponse(GetShardIteratorRequest getShardIteratorRequest)
throws AwsServiceException, SdkClientException {
String consumerId = getConsumerId(getShardIteratorRequest.overrideConfiguration());
software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest ddbGetShardIteratorRequest =
DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder()
DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder(consumerId)
.streamArn(getShardIteratorRequest.streamName())
.shardIteratorType(getShardIteratorRequest.shardIteratorTypeAsString())
.shardId(getShardIteratorRequest.shardId())
Expand Down Expand Up @@ -271,7 +291,7 @@ public CompletableFuture<GetRecordsResponse> getRecords(GetRecordsRequest getRec
+ " See getDynamoDBStreamsRecords function");
}

public CompletableFuture<GetRecordsResponseAdapter> getDynamoDBStreamsRecords(
public CompletableFuture<DynamoDBStreamsGetRecordsResponseAdapter> getDynamoDBStreamsRecords(
software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest ddbGetRecordsRequest
) throws AwsServiceException, SdkClientException {
return CompletableFuture.supplyAsync(() -> {
Expand All @@ -286,6 +306,24 @@ public CompletableFuture<GetRecordsResponseAdapter> getDynamoDBStreamsRecords(
});
}

public DescribeStreamResponse describeStreamWithFilter(String streamArn, ShardFilter shardFilter,
String consumerId) {
DescribeStreamRequest describeStreamRequest =
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId)
.streamArn(streamArn)
.shardFilter(shardFilter)
.build();
software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse describeStreamResponse;
try {
describeStreamResponse = internalClient.describeStream(describeStreamRequest);
} catch (AwsServiceException e) {
throw AmazonServiceExceptionTransformer.transformDynamoDBStreamsToKinesisDescribeStream(e);
}
return KinesisMapperUtil.convertDynamoDBDescribeStreamResponseToKinesisDescribeStreamResponse(
describeStreamResponse
);
}

/**
* Sets a value of {@link SkipRecordsBehavior} to decide how the application handles the case when records are lost.
* Default = {@link SkipRecordsBehavior#SKIP_RECORDS_TO_TRIM_HORIZON}
Expand Down
Loading