1616package software .amazon .awssdk .services .kinesis ;
1717
1818import static org .junit .Assert .assertThat ;
19+ import static org .junit .Assert .assertTrue ;
1920
2021import java .math .BigInteger ;
2122import java .time .Duration ;
2223import java .time .Instant ;
2324import java .util .List ;
2425import org .hamcrest .Matchers ;
2526import org .junit .Assert ;
26- import org .junit .Ignore ;
2727import org .junit .Test ;
2828import software .amazon .awssdk .core .SdkBytes ;
2929import software .amazon .awssdk .core .exception .SdkServiceException ;
3737import software .amazon .awssdk .services .kinesis .model .GetShardIteratorResponse ;
3838import software .amazon .awssdk .services .kinesis .model .HashKeyRange ;
3939import software .amazon .awssdk .services .kinesis .model .InvalidArgumentException ;
40- import software .amazon .awssdk .services .kinesis .model .ListStreamsRequest ;
41- import software .amazon .awssdk .services .kinesis .model .ListStreamsResponse ;
42- import software .amazon .awssdk .services .kinesis .model .MergeShardsRequest ;
4340import software .amazon .awssdk .services .kinesis .model .PutRecordRequest ;
4441import software .amazon .awssdk .services .kinesis .model .PutRecordResponse ;
4542import software .amazon .awssdk .services .kinesis .model .Record ;
4643import software .amazon .awssdk .services .kinesis .model .ResourceNotFoundException ;
4744import software .amazon .awssdk .services .kinesis .model .SequenceNumberRange ;
4845import software .amazon .awssdk .services .kinesis .model .Shard ;
4946import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
50- import software .amazon .awssdk .services .kinesis .model .SplitShardRequest ;
5147import software .amazon .awssdk .services .kinesis .model .StreamDescription ;
5248import software .amazon .awssdk .services .kinesis .model .StreamStatus ;
5349
@@ -105,45 +101,29 @@ public void testGetFromBogusIterator() {
105101 } catch (InvalidArgumentException exception ) {
106102 // Ignored or expected.
107103 }
108-
109104 }
110105
111106 @ Test
112- @ Ignore
113- public void testKinesisOperations () throws Exception {
107+ public void testCreatePutGetDelete () throws Exception {
114108 String streamName = "java-test-stream-" + System .currentTimeMillis ();
115109 boolean created = false ;
116110
117111 try {
118112
119113 // Create a stream with one shard.
120- System .out .println ("Creating Stream..." );
121114 client .createStream (CreateStreamRequest .builder ().streamName (streamName ).shardCount (1 ).build ());
122- System .out .println (" OK" );
123115 created = true ;
124116
125- // Verify that it shows up in a list call.
126- findStreamInList (streamName );
127-
128117 // Wait for it to become ACTIVE.
129- System .out .println ("Waiting for stream to become active..." );
130118 List <Shard > shards = waitForStream (streamName );
131- System .out .println (" OK" );
132119
133120 Assert .assertEquals (1 , shards .size ());
134121 Shard shard = shards .get (0 );
135122
136- // Just to be really sure in case of eventual consistency...
137- Thread .sleep (5000 );
138-
139- testPuts (streamName , shard );
123+ putRecord (streamName , "See No Evil" );
124+ putRecord (streamName , "Hear No Evil" );
140125
141- // Wait a bit to make sure the records propagate.
142- Thread .sleep (5000 );
143-
144- System .out .println ("Reading..." );
145126 testGets (streamName , shard );
146- System .out .println (" OK" );
147127
148128 } finally {
149129 if (created ) {
@@ -152,7 +132,8 @@ public void testKinesisOperations() throws Exception {
152132 }
153133 }
154134
155- private void testGets (final String streamName , final Shard shard ) {
135+ private void testGets (final String streamName , final Shard shard ) throws InterruptedException {
136+ // Wait for the shard to be in an active state
156137 // Get an iterator for the first shard.
157138 GetShardIteratorResponse iteratorResult = client .getShardIterator (
158139 GetShardIteratorRequest .builder ()
@@ -166,6 +147,19 @@ private void testGets(final String streamName, final Shard shard) {
166147 String iterator = iteratorResult .shardIterator ();
167148 Assert .assertNotNull (iterator );
168149
150+ GetRecordsResponse result = getOneRecord (iterator );
151+ validateRecord (result .records ().get (0 ), "See No Evil" );
152+
153+ result = getOneRecord (result .nextShardIterator ());
154+ validateRecord (result .records ().get (0 ), "Hear No Evil" );
155+
156+ result = client .getRecords (GetRecordsRequest .builder ()
157+ .shardIterator (result .nextShardIterator ())
158+ .build ());
159+ assertTrue (result .records ().isEmpty ());
160+ }
161+
162+ private GetRecordsResponse getOneRecord (String iterator ) {
169163 int tries = 0 ;
170164 GetRecordsResponse result ;
171165 List <Record > records ;
@@ -203,31 +197,7 @@ private void testGets(final String streamName, final Shard shard) {
203197
204198 iterator = result .nextShardIterator ();
205199 }
206-
207- System .out .println (" [Succeeded after " + tries + " tries]" );
208- Assert .assertEquals (1 , records .size ());
209- validateRecord (records .get (0 ), "See No Evil" );
210-
211- // Read the second record from the first shard.
212- result = client .getRecords (GetRecordsRequest .builder ()
213- .shardIterator (result .nextShardIterator ())
214- .build ());
215- Assert .assertNotNull (result );
216- Assert .assertNotNull (result .records ());
217- Assert .assertNotNull (result .nextShardIterator ());
218-
219- records = result .records ();
220- Assert .assertEquals (1 , records .size ());
221- validateRecord (records .get (0 ), "See No Evil" );
222-
223- // Try to read some more, get EOF.
224- result = client .getRecords (GetRecordsRequest .builder ()
225- .shardIterator (result .nextShardIterator ())
226- .build ());
227- Assert .assertNotNull (result );
228- Assert .assertNotNull (result .records ());
229- Assert .assertTrue (result .records ().isEmpty ());
230- Assert .assertNull (result .nextShardIterator ());
200+ return result ;
231201 }
232202
233203 private void validateRecord (final Record record , String data ) {
@@ -245,134 +215,8 @@ private void validateRecord(final Record record, String data) {
245215 Assert .assertTrue (Duration .between (record .approximateArrivalTimestamp (), Instant .now ()).toMinutes () < 5 );
246216 }
247217
248- private void testPuts (final String streamName , final Shard shard )
249- throws InterruptedException {
250-
251- // Put a record into the shard.
252- System .out .println ("Putting two records..." );
253- PutRecordResponse r1 = putRecord (streamName , "See No Evil" );
254- Assert .assertEquals (shard .shardId (), r1 .shardId ());
255-
256- // Check that it's sequence number is sane.
257- BigInteger startingSQN = new BigInteger (
258- shard .sequenceNumberRange ().startingSequenceNumber ()
259- );
260- BigInteger sqn1 = new BigInteger (r1 .sequenceNumber ());
261- Assert .assertTrue (sqn1 .compareTo (startingSQN ) >= 0 );
262-
263- // Put another record, which should show up later in the same shard.
264- PutRecordResponse r2 = putRecord (streamName , "See No Evil" );
265- Assert .assertEquals (shard .shardId (), r2 .shardId ());
266- BigInteger sqn2 = new BigInteger (r2 .sequenceNumber ());
267- System .out .println (" OK" );
268-
269- // Not guaranteed an order unless we explicitly ask for one, but
270- // it has to at least be larger than the starting sqn.
271- Assert .assertTrue (sqn2 .compareTo (startingSQN ) >= 0 );
272-
273- // Split the shard in two: [0-1000) and [1000-*]
274- System .out .println ("Splitting the shard..." );
275- List <Shard > shards = splitShard (streamName , shard , 1000 );
276- System .out .println (" OK" );
277-
278- // Sleep a bit for eventual consistency.
279- Thread .sleep (5000 );
280-
281-
282- // Put records into the two new shards, one after another.
283- System .out .println ("Putting some more..." );
284- PutRecordResponse r3 = putRecordExplicit (streamName , "999" );
285- PutRecordResponse r4 = putRecordExplicit (streamName ,
286- "1000" ,
287- r3 .sequenceNumber ());
288-
289- BigInteger sqn3 = new BigInteger (r3 .sequenceNumber ());
290- BigInteger sqn4 = new BigInteger (r4 .sequenceNumber ());
291- Assert .assertTrue (sqn4 .compareTo (sqn3 ) >= 0 );
292- System .out .println (" OK" );
293-
294- // Merge the two shards back together.
295- System .out .println ("Merging the shards back together..." );
296- mergeShards (streamName ,
297- shards .get (1 ).shardId (),
298- shards .get (2 ).shardId ());
299- System .out .println (" OK" );
300- }
301-
302-
303- private List <Shard > splitShard (final String streamName ,
304- final Shard shard ,
305- final long splitHashKey )
306- throws InterruptedException {
307-
308- client .splitShard (SplitShardRequest .builder ()
309- .streamName (streamName )
310- .shardToSplit (shard .shardId ())
311- .newStartingHashKey (Long .toString (splitHashKey ))
312- .build ());
313-
314- List <Shard > shards = waitForStream (streamName );
315-
316- Assert .assertEquals (3 , shards .size ());
317-
318- Shard old = shards .get (0 );
319- Assert .assertEquals (shard .shardId (), old .shardId ());
320- Assert .assertNotNull (
321- old .sequenceNumberRange ().endingSequenceNumber ()
322- );
323-
324- Shard new1 = shards .get (1 );
325- Assert .assertEquals (shard .shardId (), new1 .parentShardId ());
326- validateHashKeyRange (new1 .hashKeyRange (), 0L , splitHashKey - 1 );
327-
328- Shard new2 = shards .get (2 );
329- Assert .assertEquals (shard .shardId (), new2 .parentShardId ());
330- validateHashKeyRange (new2 .hashKeyRange (), splitHashKey , null );
331- Assert .assertEquals (old .hashKeyRange ().endingHashKey (),
332- new2 .hashKeyRange ().endingHashKey ());
333-
334- return shards ;
335- }
336-
337- private List <Shard > mergeShards (final String streamName ,
338- final String shard1 ,
339- final String shard2 )
340- throws InterruptedException {
341-
342- client .mergeShards (MergeShardsRequest .builder ()
343- .streamName (streamName )
344- .shardToMerge (shard1 )
345- .adjacentShardToMerge (shard2 )
346- .build ());
347218
348- List <Shard > shards = waitForStream (streamName );
349219
350- Assert .assertEquals (4 , shards .size ());
351- Shard merged = shards .get (3 );
352-
353- BigInteger start =
354- new BigInteger (merged .hashKeyRange ().startingHashKey ());
355- BigInteger end =
356- new BigInteger (merged .hashKeyRange ().endingHashKey ());
357-
358- Assert .assertEquals (BigInteger .valueOf (0 ), start );
359- Assert .assertTrue (end .compareTo (BigInteger .valueOf (1000 )) >= 0 );
360-
361- return shards ;
362- }
363-
364- private void validateHashKeyRange (final HashKeyRange range ,
365- final Long start ,
366- final Long end ) {
367- if (start != null ) {
368- Assert .assertEquals (BigInteger .valueOf (start ),
369- new BigInteger (range .startingHashKey ()));
370- }
371- if (end != null ) {
372- Assert .assertEquals (BigInteger .valueOf (end ),
373- new BigInteger (range .endingHashKey ()));
374- }
375- }
376220
377221 private PutRecordResponse putRecord (final String streamName ,
378222 final String data ) {
@@ -391,71 +235,6 @@ private PutRecordResponse putRecord(final String streamName,
391235 return result ;
392236 }
393237
394- private PutRecordResponse putRecordExplicit (final String streamName ,
395- final String hashKey ) {
396-
397- PutRecordResponse result = client .putRecord (PutRecordRequest .builder ()
398- .streamName (streamName )
399- .partitionKey ("foobar" )
400- .explicitHashKey (hashKey )
401- .data (SdkBytes .fromUtf8String ("Speak No Evil" ))
402- .build ());
403- Assert .assertNotNull (result );
404-
405- Assert .assertNotNull (result .shardId ());
406- Assert .assertNotNull (result .sequenceNumber ());
407-
408- return result ;
409- }
410-
411- private PutRecordResponse putRecordExplicit (final String streamName ,
412- final String hashKey ,
413- final String minSQN ) {
414-
415- PutRecordResponse result = client .putRecord (PutRecordRequest .builder ()
416- .streamName (streamName )
417- .partitionKey ("foobar" )
418- .explicitHashKey (hashKey )
419- .sequenceNumberForOrdering (minSQN )
420- .data (SdkBytes .fromUtf8String ("Hear No Evil" ))
421- .build ());
422- Assert .assertNotNull (result );
423-
424- Assert .assertNotNull (result .shardId ());
425- Assert .assertNotNull (result .sequenceNumber ());
426-
427- return result ;
428- }
429-
430- private void findStreamInList (final String streamName ) {
431- boolean found = false ;
432-
433- String start = null ;
434- while (true ) {
435-
436- ListStreamsResponse result = client .listStreams (ListStreamsRequest .builder ().exclusiveStartStreamName (start ).build ());
437-
438- Assert .assertNotNull (result );
439-
440- List <String > names = result .streamNames ();
441- Assert .assertNotNull (names );
442-
443- if (names .size () > 0 ) {
444- if (names .contains (streamName )) {
445- found = true ;
446- }
447-
448- start = names .get (names .size () - 1 );
449- }
450-
451- if (!result .hasMoreStreams ()) {
452- break ;
453- }
454-
455- }
456-
457- Assert .assertTrue (found );
458- }
459238
460239 private List <Shard > waitForStream (final String streamName )
461240 throws InterruptedException {
0 commit comments