35
35
import java .util .Collection ;
36
36
import java .util .Collections ;
37
37
import java .util .HashMap ;
38
+ import java .util .HashSet ;
38
39
import java .util .List ;
39
40
import java .util .Map ;
40
41
import java .util .Map .Entry ;
78
79
import org .springframework .kafka .listener .config .ContainerProperties ;
79
80
import org .springframework .kafka .support .Acknowledgment ;
80
81
import org .springframework .kafka .support .TopicPartitionInitialOffset ;
82
+ import org .springframework .kafka .support .TopicPartitionInitialOffset .SeekPosition ;
81
83
import org .springframework .kafka .support .serializer .JsonDeserializer ;
82
84
import org .springframework .kafka .support .serializer .JsonSerializer ;
83
85
import org .springframework .kafka .test .rule .KafkaEmbedded ;
@@ -1677,6 +1679,47 @@ public void testPauseResume() throws Exception {
1677
1679
container .stop ();
1678
1680
}
1679
1681
1682
+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
1683
+ @ Test
1684
+ public void testInitialSeek () throws Exception {
1685
+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
1686
+ Consumer <Integer , String > consumer = mock (Consumer .class );
1687
+ given (cf .createConsumer (isNull (), eq ("clientId" ), isNull ())).willReturn (consumer );
1688
+ ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
1689
+ final CountDownLatch latch = new CountDownLatch (1 );
1690
+ given (consumer .poll (anyLong ())).willAnswer (i -> {
1691
+ latch .countDown ();
1692
+ Thread .sleep (50 );
1693
+ return emptyRecords ;
1694
+ });
1695
+ TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
1696
+ new TopicPartitionInitialOffset ("foo" , 0 , SeekPosition .BEGINNING ),
1697
+ new TopicPartitionInitialOffset ("foo" , 1 , SeekPosition .END ),
1698
+ new TopicPartitionInitialOffset ("foo" , 2 , 0L ),
1699
+ new TopicPartitionInitialOffset ("foo" , 3 , Long .MAX_VALUE ),
1700
+ new TopicPartitionInitialOffset ("foo" , 4 , SeekPosition .BEGINNING ),
1701
+ new TopicPartitionInitialOffset ("foo" , 5 , SeekPosition .END ),
1702
+ };
1703
+ ContainerProperties containerProps = new ContainerProperties (topicPartition );
1704
+ containerProps .setAckMode (AckMode .RECORD );
1705
+ containerProps .setClientId ("clientId" );
1706
+ containerProps .setMessageListener ((MessageListener ) r -> { });
1707
+ KafkaMessageListenerContainer <Integer , String > container =
1708
+ new KafkaMessageListenerContainer <>(cf , containerProps );
1709
+ container .start ();
1710
+ assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
1711
+ ArgumentCaptor <Collection <TopicPartition >> captor = ArgumentCaptor .forClass (List .class );
1712
+ verify (consumer ).seekToBeginning (captor .capture ());
1713
+ assertThat (captor .getValue ()
1714
+ .equals (new HashSet <>(Arrays .asList (new TopicPartition ("foo" , 0 ), new TopicPartition ("foo" , 4 )))));
1715
+ verify (consumer ).seekToEnd (captor .capture ());
1716
+ assertThat (captor .getValue ()
1717
+ .equals (new HashSet <>(Arrays .asList (new TopicPartition ("foo" , 1 ), new TopicPartition ("foo" , 5 )))));
1718
+ verify (consumer ).seek (new TopicPartition ("foo" , 2 ), 0L );
1719
+ verify (consumer ).seek (new TopicPartition ("foo" , 3 ), Long .MAX_VALUE );
1720
+ container .stop ();
1721
+ }
1722
+
1680
1723
@ Test
1681
1724
public void testExceptionWhenCommitAfterRebalance () throws Exception {
1682
1725
final CountDownLatch rebalanceLatch = new CountDownLatch (2 );
@@ -1692,7 +1735,8 @@ public void testExceptionWhenCommitAfterRebalance() throws Exception {
1692
1735
consumeLatch .countDown ();
1693
1736
try {
1694
1737
Thread .sleep (3000 );
1695
- } catch (InterruptedException e ) {
1738
+ }
1739
+ catch (InterruptedException e ) {
1696
1740
e .printStackTrace ();
1697
1741
}
1698
1742
});
0 commit comments