-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Wait for partitions to be assigned when consuming from embedded topics #538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wait for partitions to be assigned when consuming from embedded topics #538
Conversation
@elliotkennedy Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@elliotkennedy Thank you for signing the Contributor License Agreement! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, @elliotkennedy .
This is good one!
Would you mind including your test-case to the project?
I agree that we nowhere use that consumeFromAllEmbeddedTopics()
and your fix in addition guaranties that all our tests with the consumeFromAnEmbeddedTopic()
usage are robust, but I see that your code is pretty good to be included to the project for more coverage, especially for Kafka Streams.
Also add your name to the author list for all affected classes and consider to update Copyright to the current 2018
.
Otherwise let us know and we can do all of that for you. But let us borrow your test-case anyway 😄
Thanks
…ming from embedded topics - Add a test to consume from embedded topics. - Updated authors and copywrite.
Thanks for the feedback @artembilan - I've updated as requested but I wasn't sure where to put the test. I dropped it in the tests project but that creates a dependency on the main project. Take a look and let me know if you'd like to move it somewhere else 👍 The test polling loop could be made smarter too, open to suggestions on that if you think it might be flaky. Thanks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing critical, but some comments and suggestions if you are still open for discussion and further improvements in your contribution.
Thank you anyway!
build.gradle
Outdated
compile ("org.hamcrest:hamcrest-all:$hamcrestVersion", optional) | ||
compile ("org.assertj:assertj-core:$assertjVersion", optional) | ||
|
||
testCompile project(':spring-kafka') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, your test definitely should be in the spring-kafka
module, in the test
directory. And I think the org.springframework.kafka.kstream
package should be good candidate for the place.
* @author Elliot Kennedy | ||
*/ | ||
@RunWith(SpringRunner.class) | ||
public class KafkaEmbeddedStreamTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think structure of the test-case is good enough.
We need @DirtiesContext
on the class level to clear the application context after the test execution.
There is an @EmbeddedKafka
instead of @ClassRule
. Also I think that kafkaTemplate
can be declared as a @Bean
as well.
For the Kafka Streams we have an @EnableKafkaStreams
as well.
Just consider to use all of them in your test or leave as is and we polish on merge!
But yeah... would be better to move the test to the appropriate module.
…ming from embedded topics - Move test and use spring test utils better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
Will review one more time in the morning and merge!
I even think we will back-port it up to 1.3.x
.
Cool, thanks! I'm slightly worried there is a race condition in |
Merged as 6b74d8c and cherry-picked to Regarding Thank you for contribution here. Looking forward for more! |
Created issue #539
I think there is a bug in
EmbeddedKafka.consumeFromAnEmbeddedTopic(Consumer consumer, String topic)
. It does not wait to be assigned partitions from the embedded topic as is the case with
EmbeddedKafka.onsumeFromAllEmbeddedTopics(Consumer consumer)
.I created a test, which fails without this change here.
When a consumer is created with
EmbeddedKafka.consumeFromAllEmbeddedTopics(Consumer consumer)
, all the messages are received.When a consumer is created with
EmbeddedKafka.consumeFromAnEmbeddedTopic(Consumer consumer, String topic)
, the messages are not all received.