Skip to content

Commit 993b12b

Browse files
authored
Release 2.0.1 of the DynamoDB Streams Kinesis Adapter (#63)
1 parent 1a416c5 commit 993b12b

14 files changed

+730
-54
lines changed

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
### Latest Release (v2.0.0)
1+
### Latest Release (v2.0.1)
2+
* Added support for optional shard filter parameter in DescribeStream api that allows customers to fetch child shards of a read_only parent shard.
3+
* Fixes the [bug](https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/62) where dynamodb-streams-kinesis-adapter was not constructing stream arn properly for china aws partition.
4+
* Bump aws sdk to 2.32.0
25

6+
### Release (v2.0.0)
37
* Major version upgrade to support Amazon Kinesis Client Library (KCL) version 3.x
48
* Upgrades AWS Java SDK to version 2.x, removing dependency on AWS SDK v1
59
* Provides a custom StreamsSchedulerFactory for creating a KCL scheduler optimized for DynamoDB Streams
@@ -70,4 +74,4 @@ To fix high propagation delay problems, opt-into using DynamoDBStreamsProxy (ins
7074
adapterClient,
7175
amazonDynamoDB,
7276
amazonCloudWatchClient);
73-
```
77+
```

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,4 @@ See [CHANGELOG.md](CHANGELOG.md)
6060
[6]: https://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html
6161
[7]: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html
6262
[8]: http://mvnrepository.com/artifact/com.amazonaws/dynamodb-streams-kinesis-adapter
63-
[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
63+
[9]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html

pom.xml

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
77
<packaging>jar</packaging>
88
<name>DynamoDB Streams Adapter for Java</name>
9-
<version>2.0.0</version>
9+
<version>2.0.1</version>
1010
<description>The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.</description>
1111
<url>https://aws.amazon.com/dynamodb</url>
1212

@@ -23,10 +23,10 @@
2323
</licenses>
2424

2525
<properties>
26-
<software-kinesis.version>3.1.0</software-kinesis.version>
26+
<software-kinesis.version>3.1.1</software-kinesis.version>
2727
<gpg.skip>true</gpg.skip>
2828
<maven.dependency.version>3.0.0</maven.dependency.version>
29-
<awssdk.version>2.26.19</awssdk.version>
29+
<awssdk.version>2.32.0</awssdk.version>
3030
</properties>
3131

3232
<dependencies>
@@ -242,6 +242,37 @@
242242
</execution>
243243
</executions>
244244
</plugin>
245+
<plugin>
246+
<groupId>org.apache.maven.plugins</groupId>
247+
<artifactId>maven-javadoc-plugin</artifactId>
248+
<version>3.7.0</version>
249+
<executions>
250+
<execution>
251+
<id>attach-javadocs</id>
252+
<goals>
253+
<goal>jar</goal>
254+
</goals>
255+
</execution>
256+
</executions>
257+
<configuration>
258+
<additionalOptions>
259+
<additionalOption>-Xdoclint:none</additionalOption>
260+
</additionalOptions>
261+
</configuration>
262+
</plugin>
263+
<plugin>
264+
<groupId>org.apache.maven.plugins</groupId>
265+
<artifactId>maven-source-plugin</artifactId>
266+
<version>3.2.1</version>
267+
<executions>
268+
<execution>
269+
<id>attach-sources</id>
270+
<goals>
271+
<goal>jar</goal>
272+
</goals>
273+
</execution>
274+
</executions>
275+
</plugin>
245276
</plugins>
246277
</build>
247278

@@ -254,4 +285,50 @@
254285
</dependency>
255286
</dependencies>
256287
</dependencyManagement>
257-
</project>
288+
289+
<distributionManagement>
290+
<snapshotRepository>
291+
<id>central</id>
292+
<url>https://ossrh-staging-api.central.sonatype.com/content/repositories/snapshots</url>
293+
</snapshotRepository>
294+
<repository>
295+
<id>central</id>
296+
<url>https://ossrh-staging-api.central.sonatype.com/service/local/staging/deploy/maven2/</url>
297+
</repository>
298+
</distributionManagement>
299+
300+
<profiles>
301+
<profile>
302+
<id>publishing</id>
303+
<build>
304+
<plugins>
305+
<plugin>
306+
<groupId>org.apache.maven.plugins</groupId>
307+
<artifactId>maven-gpg-plugin</artifactId>
308+
<version>3.2.1</version>
309+
<executions>
310+
<execution>
311+
<id>sign-artifacts</id>
312+
<phase>verify</phase>
313+
<goals>
314+
<goal>sign</goal>
315+
</goals>
316+
</execution>
317+
</executions>
318+
</plugin>
319+
<plugin>
320+
<groupId>org.sonatype.central</groupId>
321+
<artifactId>central-publishing-maven-plugin</artifactId>
322+
<version>0.7.0</version>
323+
<extensions>true</extensions>
324+
<configuration>
325+
<publishingServerId>central</publishingServerId>
326+
<autoPublish>false</autoPublish>
327+
</configuration>
328+
</plugin>
329+
</plugins>
330+
</build>
331+
</profile>
332+
</profiles>
333+
334+
</project>

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import lombok.Getter;
2626
import lombok.extern.slf4j.Slf4j;
2727
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
28+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2829
import software.amazon.awssdk.awscore.exception.AwsServiceException;
30+
import software.amazon.awssdk.core.ApiName;
31+
import software.amazon.awssdk.core.RequestOverrideConfiguration;
2932
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
3033
import software.amazon.awssdk.core.exception.SdkClientException;
3134
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
@@ -34,6 +37,7 @@
3437
import software.amazon.awssdk.retries.api.BackoffStrategy;
3538
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
3639
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
40+
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
3741
import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
3842
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
3943
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -46,8 +50,10 @@
4650
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
4751
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
4852
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
49-
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
53+
5054
import java.time.Duration;
55+
import java.util.Collections;
56+
import java.util.Optional;
5157
import java.util.concurrent.CompletableFuture;
5258

5359
@Slf4j
@@ -71,6 +77,7 @@ public enum SkipRecordsBehavior {
7177
private SkipRecordsBehavior skipRecordsBehavior = SkipRecordsBehavior.SKIP_RECORDS_TO_TRIM_HORIZON;
7278
private static final int MAX_DESCRIBE_STREAM_RETRY_ATTEMPTS = 50;
7379
private static final Duration DESCRIBE_STREAM_CALLS_DELAY = Duration.ofMillis(1000);
80+
private static final String KCL_CONSUMER_ID_PREFIX = "KCL-ConsumerId";
7481

7582
private Region region;
7683

@@ -157,9 +164,10 @@ public void close() {
157164
public CompletableFuture<DescribeStreamResponse> describeStream(
158165
software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest describeStreamRequest)
159166
throws AwsServiceException, SdkClientException {
167+
String consumerId = getConsumerId(describeStreamRequest.overrideConfiguration());
160168
return CompletableFuture.supplyAsync(() -> {
161169
DescribeStreamRequest ddbDescribeStreamRequest =
162-
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder()
170+
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId)
163171
.streamArn(describeStreamRequest.streamName())
164172
.limit(describeStreamRequest.limit())
165173
.exclusiveStartShardId(describeStreamRequest.exclusiveStartShardId())
@@ -175,6 +183,17 @@ public CompletableFuture<DescribeStreamResponse> describeStream(
175183
});
176184
}
177185

186+
private String getConsumerId(Optional<AwsRequestOverrideConfiguration> overrideConfiguration) {
187+
return overrideConfiguration
188+
.map(RequestOverrideConfiguration::apiNames)
189+
.orElse(Collections.emptyList())
190+
.stream()
191+
.map(ApiName::name)
192+
.filter(name -> name.contains(KCL_CONSUMER_ID_PREFIX))
193+
.findFirst()
194+
.orElse("");
195+
}
196+
178197
/**
179198
* Gets a shard iterator using the provided request from DynamoDB Streams.
180199
* @param getShardIteratorRequest Container for the necessary parameters to execute the GetShardIterator service
@@ -189,8 +208,9 @@ public CompletableFuture<GetShardIteratorResponse> getShardIterator(GetShardIter
189208

190209
private GetShardIteratorResponse getShardIteratorResponse(GetShardIteratorRequest getShardIteratorRequest)
191210
throws AwsServiceException, SdkClientException {
211+
String consumerId = getConsumerId(getShardIteratorRequest.overrideConfiguration());
192212
software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest ddbGetShardIteratorRequest =
193-
DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder()
213+
DynamoDBStreamsRequestsBuilder.getShardIteratorRequestBuilder(consumerId)
194214
.streamArn(getShardIteratorRequest.streamName())
195215
.shardIteratorType(getShardIteratorRequest.shardIteratorTypeAsString())
196216
.shardId(getShardIteratorRequest.shardId())
@@ -271,7 +291,7 @@ public CompletableFuture<GetRecordsResponse> getRecords(GetRecordsRequest getRec
271291
+ " See getDynamoDBStreamsRecords function");
272292
}
273293

274-
public CompletableFuture<GetRecordsResponseAdapter> getDynamoDBStreamsRecords(
294+
public CompletableFuture<DynamoDBStreamsGetRecordsResponseAdapter> getDynamoDBStreamsRecords(
275295
software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest ddbGetRecordsRequest
276296
) throws AwsServiceException, SdkClientException {
277297
return CompletableFuture.supplyAsync(() -> {
@@ -286,6 +306,24 @@ public CompletableFuture<GetRecordsResponseAdapter> getDynamoDBStreamsRecords(
286306
});
287307
}
288308

309+
public DescribeStreamResponse describeStreamWithFilter(String streamArn, ShardFilter shardFilter,
310+
String consumerId) {
311+
DescribeStreamRequest describeStreamRequest =
312+
DynamoDBStreamsRequestsBuilder.describeStreamRequestBuilder(consumerId)
313+
.streamArn(streamArn)
314+
.shardFilter(shardFilter)
315+
.build();
316+
software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse describeStreamResponse;
317+
try {
318+
describeStreamResponse = internalClient.describeStream(describeStreamRequest);
319+
} catch (AwsServiceException e) {
320+
throw AmazonServiceExceptionTransformer.transformDynamoDBStreamsToKinesisDescribeStream(e);
321+
}
322+
return KinesisMapperUtil.convertDynamoDBDescribeStreamResponseToKinesisDescribeStreamResponse(
323+
describeStreamResponse
324+
);
325+
}
326+
289327
/**
290328
* Sets a value of {@link SkipRecordsBehavior} to decide how the application handles the case when records are lost.
291329
* Default = {@link SkipRecordsBehavior#SKIP_RECORDS_TO_TRIM_HORIZON}

0 commit comments

Comments
 (0)