Skip to content

Commit d4aa01d

Browse files
committed
minor: Set Serdes for MockProducer for AK 2.7 partition fix KAFKA-10503 to fix new NPE
Since 2.7 Serdes are now used in the MockProducer’s #send method, which causes a NPE with AK 2.7 if they’re not set.
1 parent 97d09fc commit d4aa01d

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.kafka.clients.consumer.*;
1313
import org.apache.kafka.clients.producer.MockProducer;
1414
import org.apache.kafka.common.TopicPartition;
15+
import org.apache.kafka.common.serialization.Serdes;
1516
import org.junit.jupiter.api.BeforeEach;
1617
import pl.tlinkowski.unij.api.UniLists;
1718
import pl.tlinkowski.unij.api.UniMaps;
@@ -126,7 +127,8 @@ protected MockConsumer<String, String> setupClients() {
126127

127128
protected void instantiateConsumerProducer() {
128129
LongPollingMockConsumer<String, String> consumer = new LongPollingMockConsumer<>(OffsetResetStrategy.EARLIEST);
129-
MockProducer<String, String> producer = new MockProducer<>(true, null, null); // TODO do async testing
130+
MockProducer<String, String> producer = new MockProducer<>(true,
131+
Serdes.String().serializer(), Serdes.String().serializer()); // TODO do async testing
130132

131133
this.producerSpy = spy(producer);
132134
this.consumerSpy = spy(consumer);

parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/CoreAppTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.apache.kafka.clients.producer.MockProducer;
1515
import org.apache.kafka.clients.producer.Producer;
1616
import org.apache.kafka.common.TopicPartition;
17+
import org.apache.kafka.common.serialization.Serdes;
18+
import org.apache.kafka.common.serialization.Serializer;
1719
import org.awaitility.Awaitility;
1820
import org.junit.jupiter.api.Test;
1921
import org.mockito.Mockito;
@@ -81,7 +83,8 @@ Consumer<String, String> getKafkaConsumer() {
8183

8284
@Override
8385
Producer<String, String> getKafkaProducer() {
84-
return new MockProducer<>(true, null, null);
86+
var stringSerializer = Serdes.String().serializer();
87+
return new MockProducer<>(true, stringSerializer, stringSerializer);
8588
}
8689

8790
@Override

0 commit comments

Comments
 (0)