diff --git a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/KinesisIntegrationTests.java b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/KinesisIntegrationTests.java index a5691835ad5b..3237cd52b132 100644 --- a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/KinesisIntegrationTests.java +++ b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/KinesisIntegrationTests.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.kinesis; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.math.BigInteger; import java.time.Duration; @@ -23,7 +24,6 @@ import java.util.List; import org.hamcrest.Matchers; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkServiceException; @@ -37,9 +37,6 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; -import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; -import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; -import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; import software.amazon.awssdk.services.kinesis.model.Record; @@ -47,7 +44,6 @@ import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; import software.amazon.awssdk.services.kinesis.model.StreamDescription; import software.amazon.awssdk.services.kinesis.model.StreamStatus; @@ -105,45 +101,29 @@ public void testGetFromBogusIterator() { } catch (InvalidArgumentException exception) { // Ignored or expected. } - } @Test - @Ignore - public void testKinesisOperations() throws Exception { + public void testCreatePutGetDelete() throws Exception { String streamName = "java-test-stream-" + System.currentTimeMillis(); boolean created = false; try { // Create a stream with one shard. - System.out.println("Creating Stream..."); client.createStream(CreateStreamRequest.builder().streamName(streamName).shardCount(1).build()); - System.out.println(" OK"); created = true; - // Verify that it shows up in a list call. - findStreamInList(streamName); - // Wait for it to become ACTIVE. - System.out.println("Waiting for stream to become active..."); List shards = waitForStream(streamName); - System.out.println(" OK"); Assert.assertEquals(1, shards.size()); Shard shard = shards.get(0); - // Just to be really sure in case of eventual consistency... - Thread.sleep(5000); - - testPuts(streamName, shard); + putRecord(streamName, "See No Evil"); + putRecord(streamName, "Hear No Evil"); - // Wait a bit to make sure the records propagate. - Thread.sleep(5000); - - System.out.println("Reading..."); testGets(streamName, shard); - System.out.println(" OK"); } finally { if (created) { @@ -152,7 +132,8 @@ public void testKinesisOperations() throws Exception { } } - private void testGets(final String streamName, final Shard shard) { + private void testGets(final String streamName, final Shard shard) throws InterruptedException { + // Wait for the shard to be in an active state // Get an iterator for the first shard. GetShardIteratorResponse iteratorResult = client.getShardIterator( GetShardIteratorRequest.builder() @@ -166,6 +147,19 @@ private void testGets(final String streamName, final Shard shard) { String iterator = iteratorResult.shardIterator(); Assert.assertNotNull(iterator); + GetRecordsResponse result = getOneRecord(iterator); + validateRecord(result.records().get(0), "See No Evil"); + + result = getOneRecord(result.nextShardIterator()); + validateRecord(result.records().get(0), "Hear No Evil"); + + result = client.getRecords(GetRecordsRequest.builder() + .shardIterator(result.nextShardIterator()) + .build()); + assertTrue(result.records().isEmpty()); + } + + private GetRecordsResponse getOneRecord(String iterator) { int tries = 0; GetRecordsResponse result; List records; @@ -203,31 +197,7 @@ private void testGets(final String streamName, final Shard shard) { iterator = result.nextShardIterator(); } - - System.out.println(" [Succeeded after " + tries + " tries]"); - Assert.assertEquals(1, records.size()); - validateRecord(records.get(0), "See No Evil"); - - // Read the second record from the first shard. - result = client.getRecords(GetRecordsRequest.builder() - .shardIterator(result.nextShardIterator()) - .build()); - Assert.assertNotNull(result); - Assert.assertNotNull(result.records()); - Assert.assertNotNull(result.nextShardIterator()); - - records = result.records(); - Assert.assertEquals(1, records.size()); - validateRecord(records.get(0), "See No Evil"); - - // Try to read some more, get EOF. - result = client.getRecords(GetRecordsRequest.builder() - .shardIterator(result.nextShardIterator()) - .build()); - Assert.assertNotNull(result); - Assert.assertNotNull(result.records()); - Assert.assertTrue(result.records().isEmpty()); - Assert.assertNull(result.nextShardIterator()); + return result; } private void validateRecord(final Record record, String data) { @@ -245,134 +215,8 @@ private void validateRecord(final Record record, String data) { Assert.assertTrue(Duration.between(record.approximateArrivalTimestamp(), Instant.now()).toMinutes() < 5); } - private void testPuts(final String streamName, final Shard shard) - throws InterruptedException { - - // Put a record into the shard. - System.out.println("Putting two records..."); - PutRecordResponse r1 = putRecord(streamName, "See No Evil"); - Assert.assertEquals(shard.shardId(), r1.shardId()); - - // Check that it's sequence number is sane. - BigInteger startingSQN = new BigInteger( - shard.sequenceNumberRange().startingSequenceNumber() - ); - BigInteger sqn1 = new BigInteger(r1.sequenceNumber()); - Assert.assertTrue(sqn1.compareTo(startingSQN) >= 0); - - // Put another record, which should show up later in the same shard. - PutRecordResponse r2 = putRecord(streamName, "See No Evil"); - Assert.assertEquals(shard.shardId(), r2.shardId()); - BigInteger sqn2 = new BigInteger(r2.sequenceNumber()); - System.out.println(" OK"); - - // Not guaranteed an order unless we explicitly ask for one, but - // it has to at least be larger than the starting sqn. - Assert.assertTrue(sqn2.compareTo(startingSQN) >= 0); - - // Split the shard in two: [0-1000) and [1000-*] - System.out.println("Splitting the shard..."); - List shards = splitShard(streamName, shard, 1000); - System.out.println(" OK"); - - // Sleep a bit for eventual consistency. - Thread.sleep(5000); - - - // Put records into the two new shards, one after another. - System.out.println("Putting some more..."); - PutRecordResponse r3 = putRecordExplicit(streamName, "999"); - PutRecordResponse r4 = putRecordExplicit(streamName, - "1000", - r3.sequenceNumber()); - - BigInteger sqn3 = new BigInteger(r3.sequenceNumber()); - BigInteger sqn4 = new BigInteger(r4.sequenceNumber()); - Assert.assertTrue(sqn4.compareTo(sqn3) >= 0); - System.out.println(" OK"); - - // Merge the two shards back together. - System.out.println("Merging the shards back together..."); - mergeShards(streamName, - shards.get(1).shardId(), - shards.get(2).shardId()); - System.out.println(" OK"); - } - - - private List splitShard(final String streamName, - final Shard shard, - final long splitHashKey) - throws InterruptedException { - - client.splitShard(SplitShardRequest.builder() - .streamName(streamName) - .shardToSplit(shard.shardId()) - .newStartingHashKey(Long.toString(splitHashKey)) - .build()); - - List shards = waitForStream(streamName); - - Assert.assertEquals(3, shards.size()); - - Shard old = shards.get(0); - Assert.assertEquals(shard.shardId(), old.shardId()); - Assert.assertNotNull( - old.sequenceNumberRange().endingSequenceNumber() - ); - - Shard new1 = shards.get(1); - Assert.assertEquals(shard.shardId(), new1.parentShardId()); - validateHashKeyRange(new1.hashKeyRange(), 0L, splitHashKey - 1); - - Shard new2 = shards.get(2); - Assert.assertEquals(shard.shardId(), new2.parentShardId()); - validateHashKeyRange(new2.hashKeyRange(), splitHashKey, null); - Assert.assertEquals(old.hashKeyRange().endingHashKey(), - new2.hashKeyRange().endingHashKey()); - - return shards; - } - - private List mergeShards(final String streamName, - final String shard1, - final String shard2) - throws InterruptedException { - - client.mergeShards(MergeShardsRequest.builder() - .streamName(streamName) - .shardToMerge(shard1) - .adjacentShardToMerge(shard2) - .build()); - List shards = waitForStream(streamName); - Assert.assertEquals(4, shards.size()); - Shard merged = shards.get(3); - - BigInteger start = - new BigInteger(merged.hashKeyRange().startingHashKey()); - BigInteger end = - new BigInteger(merged.hashKeyRange().endingHashKey()); - - Assert.assertEquals(BigInteger.valueOf(0), start); - Assert.assertTrue(end.compareTo(BigInteger.valueOf(1000)) >= 0); - - return shards; - } - - private void validateHashKeyRange(final HashKeyRange range, - final Long start, - final Long end) { - if (start != null) { - Assert.assertEquals(BigInteger.valueOf(start), - new BigInteger(range.startingHashKey())); - } - if (end != null) { - Assert.assertEquals(BigInteger.valueOf(end), - new BigInteger(range.endingHashKey())); - } - } private PutRecordResponse putRecord(final String streamName, final String data) { @@ -391,71 +235,6 @@ private PutRecordResponse putRecord(final String streamName, return result; } - private PutRecordResponse putRecordExplicit(final String streamName, - final String hashKey) { - - PutRecordResponse result = client.putRecord(PutRecordRequest.builder() - .streamName(streamName) - .partitionKey("foobar") - .explicitHashKey(hashKey) - .data(SdkBytes.fromUtf8String("Speak No Evil")) - .build()); - Assert.assertNotNull(result); - - Assert.assertNotNull(result.shardId()); - Assert.assertNotNull(result.sequenceNumber()); - - return result; - } - - private PutRecordResponse putRecordExplicit(final String streamName, - final String hashKey, - final String minSQN) { - - PutRecordResponse result = client.putRecord(PutRecordRequest.builder() - .streamName(streamName) - .partitionKey("foobar") - .explicitHashKey(hashKey) - .sequenceNumberForOrdering(minSQN) - .data(SdkBytes.fromUtf8String("Hear No Evil")) - .build()); - Assert.assertNotNull(result); - - Assert.assertNotNull(result.shardId()); - Assert.assertNotNull(result.sequenceNumber()); - - return result; - } - - private void findStreamInList(final String streamName) { - boolean found = false; - - String start = null; - while (true) { - - ListStreamsResponse result = client.listStreams(ListStreamsRequest.builder().exclusiveStartStreamName(start).build()); - - Assert.assertNotNull(result); - - List names = result.streamNames(); - Assert.assertNotNull(names); - - if (names.size() > 0) { - if (names.contains(streamName)) { - found = true; - } - - start = names.get(names.size() - 1); - } - - if (!result.hasMoreStreams()) { - break; - } - - } - - Assert.assertTrue(found); - } private List waitForStream(final String streamName) throws InterruptedException {