diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 40ebe93fa..8184b56a9 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -12,11 +12,17 @@ endif::[] == Next Version +== v0.5.1.0 + +=== Features + +* #193: Pause / Resume PC (circuit breaker) without unsubscribing from topics + == v0.5.0.1 === Fixes and Improvements -- fixes: #225 Build support for Java 17, 18 (#289) +* fixes: #225 Build support for Java 17, 18 (#289) == v0.5.0.0 diff --git a/README.adoc b/README.adoc index e265dd513..9aae114f4 100644 --- a/README.adoc +++ b/README.adoc @@ -250,6 +250,8 @@ without operational burden or harming the cluster's performance * Java 8 compatibility * Throttle control and broker liveliness management * Clean draining shutdown cycle +* Manual Pause / Resume of entire PC without unsubscribing from topics (useful for implementing a simplistic https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern[circuit breaker]) +** Note: Pausing of a partition is also automatic, whenever back pressure has built up on a given partition //image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer] //image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] @@ -475,7 +477,6 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc: * @param key consume / produce key type * @param value consume / produce value type * @see AbstractParallelEoSStreamProcessor - * @see #poll(Consumer) */ ---- @@ -1162,11 +1163,17 @@ endif::[] == Next Version +== v0.5.1.0 + +=== Features + +* #193: Pause / Resume PC (circuit breaker) without unsubscribing from topics + == v0.5.0.1 === Fixes and Improvements -- fixes: #225 Build support for Java 17, 18 (#289) +* fixes: #225 Build support for Java 17, 18 (#289) == v0.5.0.0 diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java index b3e8bc152..218f77fe8 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumer.java @@ -8,9 +8,9 @@ import io.confluent.parallelconsumer.internal.DrainingCloseable; import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collection; -import java.util.function.Consumer; import java.util.regex.Pattern; // tag::javadoc[] @@ -24,31 +24,60 @@ * @param key consume / produce key type * @param value consume / produce value type * @see AbstractParallelEoSStreamProcessor - * @see #poll(Consumer) */ // end::javadoc[] public interface ParallelConsumer extends DrainingCloseable { /** - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) + * @see KafkaConsumer#subscribe(Collection) */ void subscribe(Collection topics); /** - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern) + * @see KafkaConsumer#subscribe(Pattern) */ void subscribe(Pattern pattern); /** - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener) + * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener) */ void subscribe(Collection topics, ConsumerRebalanceListener callback); /** - * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener) + * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener) */ void subscribe(Pattern pattern, ConsumerRebalanceListener callback); + /** + * Pause this consumer (i.e. stop processing of messages). + *

+ * This operation only has an effect if the consumer is currently running. In all other cases calling this method + * will be silent a no-op. + *

+ * Once the consumer is paused, the system will stop submitting work to the processing pool. Already submitted in + * flight work however will be finished. This includes work that is currently being processed inside a user function + * as well as work that has already been submitted to the processing pool but has not been picked up by a free + * worker yet. + *

+ * General remarks: + *

    + *
  • A paused consumer may still keep polling for new work until internal buffers are filled.
  • + *
  • This operation does not actively pause the subscription on the underlying Kafka Broker (compared to + * {@link KafkaConsumer#pause KafkaConsumer#pause}).
  • + *
  • Pending offset commits will still be performed when the consumer is paused.
  • + *

    + */ + void pauseIfRunning(); + + /** + * Resume this consumer (i.e. continue processing of messages). + *

    + * This operation only has an effect if the consumer is currently paused. In all other cases calling this method + * will be a silent no-op. + *

    + */ + void resumeIfPaused(); + /** * A simple tuple structure. * diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index e9486aab7..ff46569b3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -23,7 +23,7 @@ public class ParallelEoSStreamProcessor extends AbstractParallelEoSStreamP implements ParallelStreamProcessor { /** - * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which + * Construct the AsyncConsumer by wrapping this passed in consumer and producer, which can be configured any which * way as per normal. * * @see ParallelConsumerOptions diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java index acf12eb0c..108627f42 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java @@ -6,6 +6,7 @@ import io.confluent.parallelconsumer.state.WorkContainer; import lombok.Getter; +import lombok.ToString; import lombok.experimental.Delegate; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,6 +17,7 @@ /** * Internal only view on the {@link PollContext}. */ +@ToString public class PollContextInternal { @Delegate diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java index 44c1a04ed..1385116cb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java @@ -6,10 +6,12 @@ import io.confluent.parallelconsumer.state.WorkContainer; import lombok.Getter; +import lombok.ToString; /** * Internal only view of the {@link RecordContext} class. */ +@ToString public class RecordContextInternal { @Getter diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a6f95eb97..5f186fb93 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -55,6 +55,12 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall public static final String MDC_INSTANCE_ID = "pcId"; public static final String MDC_OFFSET_MARKER = "offset"; + /** + * Key for the work container descriptor that will be added to the {@link MDC diagnostic context} while inside a + * user function. + */ + private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset"; + @Getter(PROTECTED) protected final ParallelConsumerOptions options; @@ -664,7 +670,7 @@ private void controlLoop(Function, List> userFu log.trace("Loop: Process mailbox"); processWorkCompleteMailBox(); - if (state == running) { + if (isIdlingOrRunning()) { // offsets will be committed when the consumer has its partitions revoked log.trace("Loop: Maybe commit"); commitOffsetsMaybe(); @@ -947,9 +953,9 @@ private void processWorkCompleteMailBox() { wm.registerWork(action.getConsumerRecords()); } else { WorkContainer work = action.getWorkContainer(); - MDC.put(MDC_OFFSET_MARKER, work.toString()); + MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, work.toString()); wm.handleFutureResult(work); - MDC.remove(MDC_OFFSET_MARKER); + MDC.remove(MDC_WORK_CONTAINER_DESCRIPTOR); } } } @@ -984,6 +990,11 @@ private Duration getTimeToBlockFor() { return effectiveCommitAttemptDelay; } + private boolean isIdlingOrRunning() { + return state == running || state == draining || state == paused; + } + + /** * Conditionally commit offsets to broker */ @@ -1045,7 +1056,7 @@ private boolean lingeringOnCommitWouldBeBeneficial() { private Duration getTimeToNextCommitCheck() { // draining is a normal running mode for the controller - if (state == running || state == draining) { + if (isIdlingOrRunning()) { Duration timeSinceLastCommit = getTimeSinceLastCheck(); Duration timeBetweenCommits = getTimeBetweenCommits(); @SuppressWarnings("UnnecessaryLocalVariable") @@ -1065,6 +1076,7 @@ private Duration getTimeSinceLastCheck() { private void commitOffsetsThatAreReady() { log.debug("Committing offsets that are ready..."); synchronized (commitCommand) { + log.debug("Committing offsets that are ready..."); committer.retrieveOffsetsAndCommit(); clearCommitCommand(); this.lastCommitTime = Instant.now(); @@ -1086,7 +1098,7 @@ protected List, R>> runUserFunct try { if (log.isDebugEnabled()) { // first offset of the batch - MDC.put("offset", workContainerBatch.get(0).offset() + ""); + MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainerBatch.get(0).offset() + ""); } log.trace("Pool received: {}", workContainerBatch); @@ -1212,4 +1224,25 @@ private void clearCommitCommand() { } } + @Override + public void pauseIfRunning() { + if (this.state == State.running) { + log.info("Transitioning parallel consumer to state paused."); + this.state = State.paused; + } else { + log.debug("Skipping transition of parallel consumer to state paused. Current state is {}.", this.state); + } + } + + @Override + public void resumeIfPaused() { + if (this.state == State.paused) { + log.info("Transitioning paarallel consumer to state running."); + this.state = State.running; + notifySomethingToDo(); + } else { + log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", this.state); + } + } + } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java index 3fc470eb7..b94151b9f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java @@ -42,6 +42,11 @@ public class BrokerPollSystem implements OffsetCommitter { private Optional> pollControlThreadFuture; + /** + * While {@link io.confluent.parallelconsumer.internal.State#paused paused} is an externally controlled state that + * temporarily stops polling and work registration, the {@code paused} flag is used internally to pause + * subscriptions if polling needs to be throttled. + */ @Getter private volatile boolean paused = false; @@ -325,4 +330,36 @@ public void wakeupIfPaused() { if (paused) consumerManager.wakeup(); } + + /** + * Pause polling from the underlying Kafka Broker. + *

    + * Note: If the poll system is currently not in state {@link io.confluent.parallelconsumer.internal.State#running + * running}, calling this method will be a no-op. + *

    + */ + public void pausePollingAndWorkRegistrationIfRunning() { + if (this.state == State.running) { + log.info("Transitioning broker poll system to state paused."); + this.state = State.paused; + } else { + log.info("Skipping transition of broker poll system to state paused. Current state is {}.", this.state); + } + } + + /** + * Resume polling from the underlying Kafka Broker. + *

    + * Note: If the poll system is currently not in state {@link io.confluent.parallelconsumer.internal.State#paused + * paused}, calling this method will be a no-op. + *

    + */ + public void resumePollingAndWorkRegistrationIfPaused() { + if (this.state == State.paused) { + log.info("Transitioning broker poll system to state running."); + this.state = State.running; + } else { + log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.state); + } + } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/State.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/State.java index 40670ddd7..1ea8dfed1 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/State.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/State.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.internal; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ /** @@ -10,6 +10,13 @@ public enum State { unused, running, + /** + * When paused, the system will stop submitting work to the processing pool. Polling for new work however may + * continue until internal buffers have been filled sufficiently and the auto-throttling takes effect. + * In flight work will not be affected by transitioning to this state (i.e. processing will finish without any + * interrupts being sent). + */ + paused, /** * When draining, the system will stop polling for more records, but will attempt to process all already downloaded * records. Note that if you choose to close without draining, records already processed will still be committed diff --git a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java index 7ebeda8f1..048351cf3 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java @@ -97,16 +97,19 @@ public void assertCommits(MockProducer mp, List expectedOffsets, Option } public List getProducerCommitsFlattened(MockProducer mp) { + return getProducerCommitsMeta(mp).stream().map(x -> (int) x.offset()).collect(Collectors.toList()); + } + + public List getProducerCommitsMeta(MockProducer mp) { List>> history = mp.consumerGroupOffsetsHistory(); - List set = history.stream().flatMap(histories -> { + List set = history.stream().flatMap(histories -> { // get all partition offsets and flatten - var results = new ArrayList(); + ArrayList results = new ArrayList<>(); var group = histories.get(CONSUMER_GROUP_ID); for (var partitionOffsets : group.entrySet()) { OffsetAndMetadata commit = partitionOffsets.getValue(); - int offset = (int) commit.offset(); - results.add(offset); + results.add(commit); } return results.stream(); }).collect(Collectors.toList()); // set - ignore repeated commits ({@link OffsetMap}) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java index 6733bbcc1..dc5d8e968 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java @@ -9,8 +9,10 @@ import io.confluent.csid.utils.LongPollingMockConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.model.CommitHistory; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; +import io.confluent.parallelconsumer.truth.CommitHistorySubject; import io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -31,7 +33,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; import static io.confluent.csid.utils.LatchTestUtils.awaitLatch; import static io.confluent.csid.utils.StringUtils.msg; @@ -335,12 +336,18 @@ public void assertCommitsContains(List offsets) { assertThat(commits).containsAll(offsets); } - private List getCommitHistoryFlattened() { + protected List getCommitHistoryFlattened() { return (isUsingTransactionalProducer()) ? ktu.getProducerCommitsFlattened(producerSpy) : extractAllPartitionsOffsetsSequentially(false); } + private List getCommitHistoryFlattenedMeta() { + return (isUsingTransactionalProducer()) + ? ktu.getProducerCommitsMeta(producerSpy) + : extractAllPartitionsOffsetsSequentiallyMeta(true); + } + public void assertCommits(List offsets, String description) { assertCommits(offsets, Optional.of(description)); } @@ -374,17 +381,25 @@ public void assertCommits(List offsets, Optional description) { * Flattens the offsets of all partitions into a single sequential list */ protected List extractAllPartitionsOffsetsSequentially(boolean trimGenesis) { + return extractAllPartitionsOffsetsSequentiallyMeta(trimGenesis).stream(). + map(x -> (int) x.offset()) // int cast a luxury in test context - no big offsets + .collect(Collectors.toList()); + } + + /** + * Flattens the offsets of all partitions into a single sequential list + */ + protected List extractAllPartitionsOffsetsSequentiallyMeta(boolean trimGenesis) { // copy the list for safe concurrent access List> history = new ArrayList<>(consumerSpy.getCommitHistoryInt()); return history.stream() .flatMap(commits -> { - Collection values = new ArrayList<>(commits.values()); // 4 debugging - Stream rawOffsets = values.stream().map(meta -> (int) meta.offset()); + var rawValues = new ArrayList<>(commits.values()).stream(); // 4 debugging if (trimGenesis) - return rawOffsets.filter(x -> x != 0); + return rawValues.filter(x -> x.offset() != 0); else - return rawOffsets; // int cast a luxury in test context - no big offsets + return rawValues; // int cast a luxury in test context - no big offsets } ).collect(Collectors.toList()); } @@ -406,6 +421,12 @@ public void assertCommits(List offsets) { assertCommits(offsets, Optional.empty()); } + public CommitHistorySubject assertCommits() { + List commitHistoryFlattened = getCommitHistoryFlattenedMeta(); + CommitHistory actual = new CommitHistory(commitHistoryFlattened); + return CommitHistorySubject.assertThat(actual); + } + /** * Checks a list of commits of a list of partitions - outer list is partition, inner list is commits */ diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorPauseResumeTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorPauseResumeTest.java new file mode 100644 index 000000000..5bf8f834e --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorPauseResumeTest.java @@ -0,0 +1,232 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import static com.google.common.truth.Truth.assertThat; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED; + +/** + * Test for pause/resume feature of the parallel consumer (see {@code GH#193}). + * + * @author niels.oertel + */ +@Slf4j +class ParallelEoSStreamProcessorPauseResumeTest extends ParallelEoSStreamProcessorTestBase { + + private static final AtomicInteger MY_ID_GENERATOR = new AtomicInteger(); + + private static final AtomicInteger RECORD_SET_KEY_GENERATOR = new AtomicInteger(); + + private static class TestUserFunction implements Consumer> { + + private final AtomicInteger numProcessedRecords = new AtomicInteger(); + + /** + * The number of in flight records. Note that this may not exactly match the real number of in flight records as + * parallel consumer has a wrapper around the user function so incrementing/decrementing the counter is a little + * bit delayed. + */ + private final AtomicInteger numInFlightRecords = new AtomicInteger(); + + private final ReentrantLock mutex = new ReentrantLock(); + + public void lockProcessing() { + mutex.lock(); + } + + public void unlockProcessing() { + log.info("Unlocking processing"); + mutex.unlock(); + } + + @Override + public void accept(PollContext t) { + log.debug("Received: {}", t); + numInFlightRecords.incrementAndGet(); + try { + lockProcessing(); + int numProcessed = numProcessedRecords.incrementAndGet(); + log.info("Processed complete, incremented to {}", numProcessed); + } finally { + unlockProcessing(); + numInFlightRecords.decrementAndGet(); + } + } + + public void reset() { + numProcessedRecords.set(0); + } + } + + private ParallelConsumerOptions getBaseOptions(final CommitMode commitMode, int maxConcurrency) { + return ParallelConsumerOptions.builder() + .commitMode(commitMode) + .consumer(consumerSpy) + // UNORDERED so that we get nice linear offsets in our processing order (PARTITION has no concurrency, KEY depends on your keys + .ordering(UNORDERED) + .maxConcurrency(maxConcurrency) + .build(); + } + + private void addRecordsWithSetKey(final int numRecords) { + long recordSetKey = RECORD_SET_KEY_GENERATOR.incrementAndGet(); + log.debug("Producing {} records with set key {}.", numRecords, recordSetKey); + for (int i = 0; i < numRecords; ++i) { + consumerSpy.addRecord(ktu.makeRecord("key-" + recordSetKey + i, "v0-test-" + i)); + } + log.debug("Finished producing {} records with set key {}.", numRecords, recordSetKey); + } + + private void setupParallelConsumerInstance(final CommitMode commitMode, final int maxConcurrency) { + setupParallelConsumerInstance(getBaseOptions(commitMode, maxConcurrency)); + + // register unique ID on the parallel consumer + String myId = "p/r-test-" + MY_ID_GENERATOR.incrementAndGet(); + parallelConsumer.setMyId(Optional.of(myId)); + } + + private TestUserFunction createTestSetup(final CommitMode commitMode, final int maxConcurrency) { + setupParallelConsumerInstance(commitMode, maxConcurrency); + TestUserFunction testUserFunction = new TestUserFunction(); + parallelConsumer.poll(testUserFunction); + + return testUserFunction; + } + + /** + * This test verifies that no new records are submitted to the workers once the consumer is paused. + * + * @param commitMode The commit mode to be configured for the parallel consumer. + */ + @ParameterizedTest() + @EnumSource(CommitMode.class) + @SneakyThrows + void pausingAndResumingProcessingShouldWork(final CommitMode commitMode) { + int numTestRecordsPerSet = 1_000; + int totalRecordsExpected = 2 * numTestRecordsPerSet; + + TestUserFunction testUserFunction = createTestSetup(commitMode, 3); + + // produce some messages + addRecordsWithSetKey(numTestRecordsPerSet); + + // wait for processing to finish + Awaitility + .waitAtMost(defaultTimeout) + .alias(numTestRecordsPerSet + " records should be processed") + .untilAsserted(() -> assertThat(testUserFunction.numProcessedRecords.get()).isEqualTo(numTestRecordsPerSet)); + + // overall committed offset should reach the same value + awaitForCommit(numTestRecordsPerSet); + + // + testUserFunction.reset(); + + // pause parallel consumer and wait for control loops to catch up + parallelConsumer.pauseIfRunning(); + + awaitForOneLoopCycle(); + + // produce more messages -> nothing should be processed + addRecordsWithSetKey(numTestRecordsPerSet); + + awaitForSomeLoopCycles(2); + + // shouldn't have produced any records + assertThat(testUserFunction.numProcessedRecords.get()).isEqualTo(0L); + + // overall committed offset should stay at old value + awaitForCommit(numTestRecordsPerSet); + + // resume parallel consumer -> + parallelConsumer.resumeIfPaused(); + + // messages should be processed now + Awaitility + .waitAtMost(defaultTimeout) + .alias(numTestRecordsPerSet + " records should be processed") + .untilAsserted(() -> assertThat(testUserFunction.numProcessedRecords.get()).isEqualTo(numTestRecordsPerSet)); + + // overall committed offset should reach the total of two batches that were processed + awaitForCommit(totalRecordsExpected); + } + + /** + * This test verifies that in flight work is finished successfully when the consumer is paused. In flight work is + * work that's currently being processed inside a user function has already been submitted to be processed based on + * the dynamic load factor. The test also verifies that new offsets are committed once the in-flight work finishes + * even if the consumer is still paused. + * + * @param commitMode The commit mode to be configured for the parallel consumer. + */ + @ParameterizedTest() + @EnumSource(CommitMode.class) + @SneakyThrows + void testThatInFlightWorkIsFinishedSuccessfullyAndOffsetsAreCommitted(final CommitMode commitMode) { + int degreeOfParallelism = 3; + int numTestRecordsPerSet = 1_000; + + TestUserFunction testUserFunction = createTestSetup(commitMode, degreeOfParallelism); + // block processing in the user function to ensure we have in flight work once we pause the consumer + testUserFunction.lockProcessing(); + + // produce some messages + addRecordsWithSetKey(numTestRecordsPerSet); + + // wait until we have enough records in flight + Awaitility + .waitAtMost(defaultTimeout) + .alias(degreeOfParallelism + " records should be in flight processed") + .untilAsserted(() -> assertThat(testUserFunction.numInFlightRecords.get()).isEqualTo(degreeOfParallelism)); + + // + assertCommits().isEmpty(); + + // pause parallel consumer and wait for control loops to catch up + parallelConsumer.pauseIfRunning(); + awaitForOneLoopCycle(); + + // unlock the user function + testUserFunction.unlockProcessing(); + + // in flight messages + buffered messages should get processed now (exact number is based on dynamic load factor) + Awaitility + .waitAtMost(defaultTimeout) + .alias("at least " + degreeOfParallelism + " records should be processed") + .untilAsserted(() -> assertThat(testUserFunction.numProcessedRecords.get()).isGreaterThan(degreeOfParallelism)); + + // overall committed offset should reach the same value + awaitForCommit(testUserFunction.numProcessedRecords.get()); + + // shouldn't have anymore in flight records now + assertThat(testUserFunction.numInFlightRecords.get()).isEqualTo(0); + assertThat(parallelConsumer.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0); + + // resume parallel consumer -> + parallelConsumer.resumeIfPaused(); + + // other pending messages should be processed now + Awaitility + .waitAtMost(defaultTimeout) + .alias(numTestRecordsPerSet + " records should be processed") + .untilAsserted(() -> assertThat(testUserFunction.numProcessedRecords.get()).isEqualTo(numTestRecordsPerSet)); + + // overall committed offset should reach the total number of processed records + awaitForCommit(numTestRecordsPerSet); + } + +} diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/truth/CommitHistorySubject.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/truth/CommitHistorySubject.java index 3f95cb44f..a1ad3db8a 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/truth/CommitHistorySubject.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/truth/CommitHistorySubject.java @@ -52,4 +52,13 @@ public void offset(long quantity) { public void anything() { check("commits()").that(actual.getOffsetHistory()).isNotEmpty(); } + + public void nothing() { + check("commits()").that(actual.getOffsetHistory()).isEmpty(); + } + + public void isEmpty() { + nothing(); + } + } diff --git a/parallel-consumer-core/src/test/resources/logback-test.xml b/parallel-consumer-core/src/test/resources/logback-test.xml index e7d4e3b4f..4d892f953 100644 --- a/parallel-consumer-core/src/test/resources/logback-test.xml +++ b/parallel-consumer-core/src/test/resources/logback-test.xml @@ -21,15 +21,14 @@ - + - - + @@ -61,6 +60,10 @@ + + + + diff --git a/parallel-consumer-vertx/src/test/resources/logback-test.xml b/parallel-consumer-vertx/src/test/resources/logback-test.xml index e3e153db2..d8d564c02 100644 --- a/parallel-consumer-vertx/src/test/resources/logback-test.xml +++ b/parallel-consumer-vertx/src/test/resources/logback-test.xml @@ -3,7 +3,7 @@ Copyright (C) 2020-2022 Confluent, Inc. --> - +