Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
package software.amazon.awssdk.services.kinesis;

import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.math.BigInteger;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkServiceException;
Expand All @@ -37,17 +37,13 @@
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SplitShardRequest;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

Expand Down Expand Up @@ -105,45 +101,29 @@ public void testGetFromBogusIterator() {
} catch (InvalidArgumentException exception) {
// Ignored or expected.
}

}

@Test
@Ignore
public void testKinesisOperations() throws Exception {
public void testCreatePutGetDelete() throws Exception {
String streamName = "java-test-stream-" + System.currentTimeMillis();
boolean created = false;

try {

// Create a stream with one shard.
System.out.println("Creating Stream...");
client.createStream(CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
System.out.println(" OK");
created = true;

// Verify that it shows up in a list call.
findStreamInList(streamName);

// Wait for it to become ACTIVE.
System.out.println("Waiting for stream to become active...");
List<Shard> shards = waitForStream(streamName);
System.out.println(" OK");

Assert.assertEquals(1, shards.size());
Shard shard = shards.get(0);

// Just to be really sure in case of eventual consistency...
Thread.sleep(5000);

testPuts(streamName, shard);
putRecord(streamName, "See No Evil");
putRecord(streamName, "Hear No Evil");

// Wait a bit to make sure the records propagate.
Thread.sleep(5000);

System.out.println("Reading...");
testGets(streamName, shard);
System.out.println(" OK");

} finally {
if (created) {
Expand All @@ -152,7 +132,8 @@ public void testKinesisOperations() throws Exception {
}
}

private void testGets(final String streamName, final Shard shard) {
private void testGets(final String streamName, final Shard shard) throws InterruptedException {
// Wait for the shard to be in an active state
// Get an iterator for the first shard.
GetShardIteratorResponse iteratorResult = client.getShardIterator(
GetShardIteratorRequest.builder()
Expand All @@ -166,6 +147,19 @@ private void testGets(final String streamName, final Shard shard) {
String iterator = iteratorResult.shardIterator();
Assert.assertNotNull(iterator);

GetRecordsResponse result = getOneRecord(iterator);
validateRecord(result.records().get(0), "See No Evil");

result = getOneRecord(result.nextShardIterator());
validateRecord(result.records().get(0), "Hear No Evil");

result = client.getRecords(GetRecordsRequest.builder()
.shardIterator(result.nextShardIterator())
.build());
assertTrue(result.records().isEmpty());
}

private GetRecordsResponse getOneRecord(String iterator) {
int tries = 0;
GetRecordsResponse result;
List<Record> records;
Expand Down Expand Up @@ -203,31 +197,7 @@ private void testGets(final String streamName, final Shard shard) {

iterator = result.nextShardIterator();
}

System.out.println(" [Succeeded after " + tries + " tries]");
Assert.assertEquals(1, records.size());
validateRecord(records.get(0), "See No Evil");

// Read the second record from the first shard.
result = client.getRecords(GetRecordsRequest.builder()
.shardIterator(result.nextShardIterator())
.build());
Assert.assertNotNull(result);
Assert.assertNotNull(result.records());
Assert.assertNotNull(result.nextShardIterator());

records = result.records();
Assert.assertEquals(1, records.size());
validateRecord(records.get(0), "See No Evil");

// Try to read some more, get EOF.
result = client.getRecords(GetRecordsRequest.builder()
.shardIterator(result.nextShardIterator())
.build());
Assert.assertNotNull(result);
Assert.assertNotNull(result.records());
Assert.assertTrue(result.records().isEmpty());
Assert.assertNull(result.nextShardIterator());
return result;
}

private void validateRecord(final Record record, String data) {
Expand All @@ -245,134 +215,8 @@ private void validateRecord(final Record record, String data) {
Assert.assertTrue(Duration.between(record.approximateArrivalTimestamp(), Instant.now()).toMinutes() < 5);
}

private void testPuts(final String streamName, final Shard shard)
throws InterruptedException {

// Put a record into the shard.
System.out.println("Putting two records...");
PutRecordResponse r1 = putRecord(streamName, "See No Evil");
Assert.assertEquals(shard.shardId(), r1.shardId());

// Check that it's sequence number is sane.
BigInteger startingSQN = new BigInteger(
shard.sequenceNumberRange().startingSequenceNumber()
);
BigInteger sqn1 = new BigInteger(r1.sequenceNumber());
Assert.assertTrue(sqn1.compareTo(startingSQN) >= 0);

// Put another record, which should show up later in the same shard.
PutRecordResponse r2 = putRecord(streamName, "See No Evil");
Assert.assertEquals(shard.shardId(), r2.shardId());
BigInteger sqn2 = new BigInteger(r2.sequenceNumber());
System.out.println(" OK");

// Not guaranteed an order unless we explicitly ask for one, but
// it has to at least be larger than the starting sqn.
Assert.assertTrue(sqn2.compareTo(startingSQN) >= 0);

// Split the shard in two: [0-1000) and [1000-*]
System.out.println("Splitting the shard...");
List<Shard> shards = splitShard(streamName, shard, 1000);
System.out.println(" OK");

// Sleep a bit for eventual consistency.
Thread.sleep(5000);


// Put records into the two new shards, one after another.
System.out.println("Putting some more...");
PutRecordResponse r3 = putRecordExplicit(streamName, "999");
PutRecordResponse r4 = putRecordExplicit(streamName,
"1000",
r3.sequenceNumber());

BigInteger sqn3 = new BigInteger(r3.sequenceNumber());
BigInteger sqn4 = new BigInteger(r4.sequenceNumber());
Assert.assertTrue(sqn4.compareTo(sqn3) >= 0);
System.out.println(" OK");

// Merge the two shards back together.
System.out.println("Merging the shards back together...");
mergeShards(streamName,
shards.get(1).shardId(),
shards.get(2).shardId());
System.out.println(" OK");
}


private List<Shard> splitShard(final String streamName,
final Shard shard,
final long splitHashKey)
throws InterruptedException {

client.splitShard(SplitShardRequest.builder()
.streamName(streamName)
.shardToSplit(shard.shardId())
.newStartingHashKey(Long.toString(splitHashKey))
.build());

List<Shard> shards = waitForStream(streamName);

Assert.assertEquals(3, shards.size());

Shard old = shards.get(0);
Assert.assertEquals(shard.shardId(), old.shardId());
Assert.assertNotNull(
old.sequenceNumberRange().endingSequenceNumber()
);

Shard new1 = shards.get(1);
Assert.assertEquals(shard.shardId(), new1.parentShardId());
validateHashKeyRange(new1.hashKeyRange(), 0L, splitHashKey - 1);

Shard new2 = shards.get(2);
Assert.assertEquals(shard.shardId(), new2.parentShardId());
validateHashKeyRange(new2.hashKeyRange(), splitHashKey, null);
Assert.assertEquals(old.hashKeyRange().endingHashKey(),
new2.hashKeyRange().endingHashKey());

return shards;
}

private List<Shard> mergeShards(final String streamName,
final String shard1,
final String shard2)
throws InterruptedException {

client.mergeShards(MergeShardsRequest.builder()
.streamName(streamName)
.shardToMerge(shard1)
.adjacentShardToMerge(shard2)
.build());

List<Shard> shards = waitForStream(streamName);

Assert.assertEquals(4, shards.size());
Shard merged = shards.get(3);

BigInteger start =
new BigInteger(merged.hashKeyRange().startingHashKey());
BigInteger end =
new BigInteger(merged.hashKeyRange().endingHashKey());

Assert.assertEquals(BigInteger.valueOf(0), start);
Assert.assertTrue(end.compareTo(BigInteger.valueOf(1000)) >= 0);

return shards;
}

private void validateHashKeyRange(final HashKeyRange range,
final Long start,
final Long end) {
if (start != null) {
Assert.assertEquals(BigInteger.valueOf(start),
new BigInteger(range.startingHashKey()));
}
if (end != null) {
Assert.assertEquals(BigInteger.valueOf(end),
new BigInteger(range.endingHashKey()));
}
}

private PutRecordResponse putRecord(final String streamName,
final String data) {
Expand All @@ -391,71 +235,6 @@ private PutRecordResponse putRecord(final String streamName,
return result;
}

private PutRecordResponse putRecordExplicit(final String streamName,
final String hashKey) {

PutRecordResponse result = client.putRecord(PutRecordRequest.builder()
.streamName(streamName)
.partitionKey("foobar")
.explicitHashKey(hashKey)
.data(SdkBytes.fromUtf8String("Speak No Evil"))
.build());
Assert.assertNotNull(result);

Assert.assertNotNull(result.shardId());
Assert.assertNotNull(result.sequenceNumber());

return result;
}

private PutRecordResponse putRecordExplicit(final String streamName,
final String hashKey,
final String minSQN) {

PutRecordResponse result = client.putRecord(PutRecordRequest.builder()
.streamName(streamName)
.partitionKey("foobar")
.explicitHashKey(hashKey)
.sequenceNumberForOrdering(minSQN)
.data(SdkBytes.fromUtf8String("Hear No Evil"))
.build());
Assert.assertNotNull(result);

Assert.assertNotNull(result.shardId());
Assert.assertNotNull(result.sequenceNumber());

return result;
}

private void findStreamInList(final String streamName) {
boolean found = false;

String start = null;
while (true) {

ListStreamsResponse result = client.listStreams(ListStreamsRequest.builder().exclusiveStartStreamName(start).build());

Assert.assertNotNull(result);

List<String> names = result.streamNames();
Assert.assertNotNull(names);

if (names.size() > 0) {
if (names.contains(streamName)) {
found = true;
}

start = names.get(names.size() - 1);
}

if (!result.hasMoreStreams()) {
break;
}

}

Assert.assertTrue(found);
}

private List<Shard> waitForStream(final String streamName)
throws InterruptedException {
Expand Down