Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.testcontainers.FilteredTestContainerSlf4jLogConsumer;
Expand Down Expand Up @@ -87,7 +86,7 @@ protected String setupTopic(String name) {

protected void ensureTopic(String topic, int numPartitions) {
NewTopic e1 = new NewTopic(topic, numPartitions, (short) 1);
CreateTopicsResult topics = kcu.admin.createTopics(UniLists.of(e1));
CreateTopicsResult topics = kcu.getAdmin().createTopics(UniLists.of(e1));
try {
Void all = topics.all().get();
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.Range;
Expand Down Expand Up @@ -33,13 +32,15 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase.defaultTimeout;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED;
import static java.time.Duration.ofMillis;
Expand Down Expand Up @@ -112,7 +113,6 @@ void offsetsOpenClose(OffsetEncoding encoding) {
.consumer(newConsumerOne)
.producer(producerOne)
.build();
kcu.props.put(ConsumerConfig.CLIENT_ID_CONFIG, "ONE-my-client");

// first client
{
Expand Down Expand Up @@ -183,8 +183,7 @@ void offsetsOpenClose(OffsetEncoding encoding) {
// second client
{
//
kcu.props.put(ConsumerConfig.CLIENT_ID_CONFIG, "THREE-my-client");
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer();
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer(customClientId("THREE-my-client"));
KafkaProducer<String, String> producerThree = kcu.createNewProducer(true);
var optionsThree = options.toBuilder().consumer(newConsumerThree).producer(producerThree).build();
try (var asyncThree = new ParallelEoSStreamProcessor<String, String>(optionsThree)) {
Expand All @@ -210,16 +209,22 @@ void offsetsOpenClose(OffsetEncoding encoding) {
OffsetSimultaneousEncoder.compressionForced = false;
}

private Properties customClientId(final String id) {
Properties properties = new Properties();
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
return properties;
}

private void send(String topic, int partition, Integer value) throws InterruptedException, ExecutionException {
RecordMetadata recordMetadata = kcu.producer.send(new ProducerRecord<>(topic, partition, value.toString(), value.toString())).get();
RecordMetadata recordMetadata = kcu.getProducer().send(new ProducerRecord<>(topic, partition, value.toString(), value.toString())).get();
}

private void send(int quantity, String topic, int partition) throws InterruptedException, ExecutionException {
log.debug("Sending {} messages to {}", quantity, topic);
var futures = new ArrayList<Future<RecordMetadata>>();
// async
for (Integer index : Range.range(quantity)) {
Future<RecordMetadata> send = kcu.producer.send(new ProducerRecord<>(topic, partition, index.toString(), index.toString()));
Future<RecordMetadata> send = kcu.getProducer().send(new ProducerRecord<>(topic, partition, index.toString(), index.toString()));
futures.add(send);
}
// block until finished
Expand All @@ -239,7 +244,8 @@ void correctOffsetVerySimple() {
setupTopic();

// send a single message
kcu.producer.send(new ProducerRecord<>(topic, "0", "0"));
String expectedPayload = "0";
kcu.getProducer().send(new ProducerRecord<>(topic, expectedPayload, expectedPayload));

KafkaConsumer<String, String> consumer = kcu.createNewConsumer();
KafkaProducer<String, String> producerOne = kcu.createNewProducer(true);
Expand All @@ -263,16 +269,15 @@ void correctOffsetVerySimple() {
// the single message is processed
await().untilAsserted(() -> assertThat(readByOne)
.extracting(ConsumerRecord::value)
.containsExactly("0"));
.containsExactly(expectedPayload));

} finally {
log.debug("asyncOne closed");
}

//
log.debug("Starting up new client");
kcu.props.put(ConsumerConfig.CLIENT_ID_CONFIG, "THREE-my-client");
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer();
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer(customClientId("THREE-my-client"));
KafkaProducer<String, String> producerThree = kcu.createNewProducer(true);
ParallelConsumerOptions<String, String> optionsThree = options.toBuilder()
.consumer(newConsumerThree)
Expand Down Expand Up @@ -345,7 +350,7 @@ void largeNumberOfMessagesSmallOffsetBitmap() {
});

// the single message is not processed
await().atMost(ofSeconds(10)).untilAsserted(() -> assertThat(readByOne.size())
await().atMost(defaultTimeout).untilAsserted(() -> assertThat(readByOne.size())
.isEqualTo(quantity - numberOfFailingMessages));

//
Expand All @@ -359,8 +364,7 @@ void largeNumberOfMessagesSmallOffsetBitmap() {
// step 2
{
//
kcu.props.put(ConsumerConfig.CLIENT_ID_CONFIG, "THREE-my-client");
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer();
KafkaConsumer<String, String> newConsumerThree = kcu.createNewConsumer(customClientId("THREE-my-client"));
KafkaProducer<String, String> producerThree = kcu.createNewProducer(true);
var optionsThree = baseOptions.toBuilder()
.consumer(newConsumerThree)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
Expand Down Expand Up @@ -39,7 +38,7 @@ public class KafkaSanityTests extends BrokerIntegrationTest<String, String> {
public void pausedConsumerStillLongPollsForNothing() {
log.info("Setup topic");
setupTopic();
KafkaConsumer<String, String> consumer = kcu.consumer;
KafkaConsumer<String, String> consumer = kcu.getConsumer();
log.info("Subscribe to topic");
consumer.subscribe(UniLists.of(topic));
Set<TopicPartition> assignment = consumer.assignment();
Expand Down Expand Up @@ -79,7 +78,7 @@ void offsetMetadataSpaceAvailable() {
.as("approximate sanity - ensure start state settings (shared static state :`( )")
.isGreaterThan(3000);

KafkaConsumer<String, String> consumer = kcu.consumer;
KafkaConsumer<String, String> consumer = kcu.getConsumer();
TopicPartition tpOne = new TopicPartition(topic, 0);
TopicPartition tpTwo = new TopicPartition(topic, 1);
HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.ProgressBarUtils;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void setupTestData() {
void timedNormalKafkaConsumerTest() {
setupTestData();
// subscribe in advance, it can be a few seconds
kcu.consumer.subscribe(UniLists.of(topic));
kcu.getConsumer().subscribe(UniLists.of(topic));

readRecordsPlainConsumer(total, topic);
}
Expand Down Expand Up @@ -128,7 +127,7 @@ private void readRecordsPlainConsumer(int total, String topic) {

Executors.newCachedThreadPool().submit(() -> {
while (allRecords.size() < total) {
ConsumerRecords<String, String> poll = kcu.consumer.poll(ofMillis(500));
ConsumerRecords<String, String> poll = kcu.getConsumer().poll(ofMillis(500));
log.info("Polled batch of {} messages", poll.count());

//save
Expand Down Expand Up @@ -177,7 +176,7 @@ private void publishMessages(int keyRange, int total, String topic) {
String value = RandomStringUtils.randomAlphabetic(messageSizeInBytes);
var producerRecord = new ProducerRecord<>(topic, key, value);
try {
var meta = kcu.producer.send(producerRecord);
var meta = kcu.getProducer().send(producerRecord);
futureMetadataResultsFromPublishing.add(meta);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.EnumCartesianProductTestSets;
Expand Down Expand Up @@ -226,7 +226,7 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order,
// })
.alias(failureMessage)
.untilAsserted(() -> {
log.info("Processed-count: {}, Produced-count: {}", processedCount.get(), producedCount.get());
log.trace("Processed-count: {}, Produced-count: {}", processedCount.get(), producedCount.get());
int delta = producedCount.get() - processedCount.get();
if (delta == numThreads && pt.getRounds().get() > 1) {
log.error("Here we go fishy...");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

package io.confluent.parallelconsumer.integrationTests.utils;
/*-
* Copyright (C) 2020 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/
package io.confluent.parallelconsumer.integrationTests.utils;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -26,56 +26,71 @@
public class KafkaClientUtils {

public static final int MAX_POLL_RECORDS = 10_000;
public static final String GROUP_ID_PREFIX = "group-1-";

private final KafkaContainer kContainer;
public Properties props = new Properties();

public KafkaConsumer<String, String> consumer;
@Getter
private KafkaConsumer<String, String> consumer;

@Getter
private KafkaProducer<String, String> producer;

public KafkaProducer<String, String> producer;
@Getter
private AdminClient admin;
private final String groupId = GROUP_ID_PREFIX + RandomUtils.nextInt();

public AdminClient admin;

public KafkaClientUtils(KafkaContainer kafkaContainer) {
kafkaContainer.addEnv("KAFKA_transaction_state_log_replication_factor", "1");
kafkaContainer.addEnv("KAFKA_transaction_state_log_min_isr", "1");
kafkaContainer.start();
this.kContainer = kafkaContainer;
setupProps();
}

public void setupProps() {
private Properties setupCommonProps() {
var commonProps = new Properties();
String servers = this.kContainer.getBootstrapServers();
commonProps.put("bootstrap.servers", servers);
return commonProps;
}

props.put("bootstrap.servers", servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1-" + RandomUtils.nextInt());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());
private Properties setupProducerProps() {
var producerProps = setupCommonProps();

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return producerProps;
}

private Properties setupConsumerProps() {
var consumerProps = setupCommonProps();

//
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) ofSeconds(10).toMillis()); // speed things up
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

//
// props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10);
// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100);
// consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10);
// consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100);

// make sure we can download lots of records if they're small. Default is 500
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1_000_000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
// consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1_000_000);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);

return consumerProps;
}

@BeforeEach
public void open() {
log.info("Setting up clients...");
consumer = this.createNewConsumer();
producer = this.createNewProducer(false);
admin = AdminClient.create(props);
admin = AdminClient.create(setupCommonProps());
}

@AfterEach
Expand All @@ -96,28 +111,43 @@ public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup) {
return createNewConsumer(newConsumerGroup, new Properties());
}

public <K, V> KafkaConsumer<K, V> createNewConsumer(Properties options) {
return createNewConsumer(false, options);
}

public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup, Properties options) {
Properties properties = setupConsumerProps();

if (newConsumerGroup) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1-" + RandomUtils.nextInt()); // new group
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_PREFIX + RandomUtils.nextInt()); // new group
}
props.putAll(options);
KafkaConsumer<K, V> kvKafkaConsumer = new KafkaConsumer<>(props);

// override with custom
properties.putAll(options);

KafkaConsumer<K, V> kvKafkaConsumer = new KafkaConsumer<>(properties);
log.debug("New consume {}", kvKafkaConsumer);
return kvKafkaConsumer;
}

public <K, V> KafkaProducer<K, V> createNewProducer(boolean tx) {
Properties properties = setupProducerProps();

var txProps = new Properties();
txProps.putAll(props);
txProps.putAll(properties);

if (tx) {
// random number so we get a unique producer tx session each time. Normally wouldn't do this in production,
// but sometimes running in the test suite our producers step on each other between test runs and this causes
// Producer Fenced exceptions:
// Error looks like: Producer attempted an operation with an old epoch. Either there is a newer producer with
// the same transactionalId, or the producer's transaction has been expired by the broker.
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.getClass().getSimpleName() + ":" + RandomUtils.nextInt()); // required for tx
txProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) ofSeconds(10).toMillis()); // speed things up

}
KafkaProducer<K, V> kvKafkaProducer = new KafkaProducer<>(txProps);

log.debug("New producer {}", kvKafkaProducer);
return kvKafkaProducer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.confluent.parallelconsumer.examples.streams;

/*-
* Copyright (C) 2020 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
Expand Down Expand Up @@ -47,7 +46,7 @@ class StreamsAppUnderTest extends StreamsApp {

@Override
Consumer<String, String> getKafkaConsumer() {
return kcu.consumer;
return kcu.getConsumer();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@

<!-- version numbers -->
<!-- plugins -->
<mycila.version>4.1</mycila.version>
<mycila.version>4.2.rc2</mycila.version>
<lombok.version>1.18.16</lombok.version>
<auto-service.version>1.0-rc7</auto-service.version>
<surefire.version>3.0.0-M5</surefire.version>
Expand Down