diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java
new file mode 100644
index 000000000..7fe1a3ad7
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java
@@ -0,0 +1,23 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import lombok.experimental.StandardException;
+
+/**
+ * A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
+ * and that it should be retired at a later time.
+ *
+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
+ * is thrown by the user's function, that will be logged as an error (but will still be retried later).
+ *
+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
+ * logged as an error.
+ *
+ * @author Antony Stubbs
+ */
+@StandardException
+public class PCRetriableException extends RuntimeException {
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
index 78df7a154..029b16f7c 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
@@ -47,7 +47,7 @@ public ParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
}
@Override
- public void poll(Consumer> usersVoidConsumptionFunction) {
+ public void poll(UserFunctions.Processor usersVoidConsumptionFunction) {
Function, List> wrappedUserFunc = (context) -> {
log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java
index 5837631f8..4de6a3415 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.UserFunctions.Transformer;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import lombok.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -27,21 +28,20 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle
}
/**
- * Register a function to be applied in parallel to each received message
+ * Register a function to be applied in parallel to each received message.
*
* @param usersVoidConsumptionFunction the function
*/
// todo why isn't this in ParallelConsumer ?
- void poll(Consumer> usersVoidConsumptionFunction);
-
+ void poll(UserFunctions.Processor usersVoidConsumptionFunction);
/**
- * Register a function to be applied in parallel to each received message, which in turn returns one or more {@link
- * ProducerRecord}s to be sent back to the broker.
+ * Register a function to be applied in parallel to each received message, which in turn returns one or more
+ * {@link ProducerRecord}s to be sent back to the broker.
*
* @param callback applied after the produced message is acknowledged by kafka
*/
- void pollAndProduceMany(Function, List>> userFunction,
+ void pollAndProduceMany(Transformer userFunction,
Consumer> callback);
/**
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/UserFunctions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/UserFunctions.java
new file mode 100644
index 000000000..9d01cb1a3
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/UserFunctions.java
@@ -0,0 +1,50 @@
+package io.confluent.parallelconsumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.List;
+
+/**
+ * Types of user functions used for processing records.
+ *
+ * @author Antony Stubbs
+ */
+public interface UserFunctions {
+
+ /**
+ * Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
+ */
+ @FunctionalInterface
+ interface Processor {// extends java.util.function.Consumer> {
+
+ /**
+ * Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
+ *
+ * User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it
+ * later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged
+ * at {@code WARN} level. Note that, by default, any exception thrown from a users function will cause the
+ * record to be retried, as if a {@link PCRetriableException} had actually been thrown.
+ *
+ * @param records the Kafka records to process
+ * @see PCRetriableException
+ * @see ParallelConsumerOptions#getRetryDelayProvider()
+ * @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
+ */
+ void process(PollContext records);
+ }
+
+ @FunctionalInterface
+ interface Transformer { //extends java.util.function.Function, List>> {
+
+ /**
+ * Like {@link Processor#process(PollContext)} but also returns records to be produced back to Kafka.
+ *
+ * @param records the Kafka records to process
+ * @return the function result
+ * @see Processor#process(PollContext)
+ */
+ List> flatMap(PollContext records);
+
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
index c78ad1769..8f3b2bbcb 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
@@ -5,10 +5,7 @@
*/
import io.confluent.csid.utils.TimeUtils;
-import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
-import io.confluent.parallelconsumer.ParallelConsumer;
-import io.confluent.parallelconsumer.ParallelConsumerOptions;
-import io.confluent.parallelconsumer.PollContextInternal;
+import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.*;
@@ -1174,7 +1171,15 @@ protected List, R>> runUserFunct
return intermediateResults;
} catch (Exception e) {
// handle fail
- log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", context, e);
+ var cause = e.getCause();
+ String msg = msg("Exception caught in user function running stage, registering WC as failed, returning to" +
+ " mailbox. Context: {}", context, e);
+ if (cause instanceof PCRetriableException) {
+ log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
+ } else {
+ log.error(msg, e);
+ }
+
for (var wc : workContainerBatch) {
wc.onUserFunctionFailure(e);
addToMailbox(context, wc); // always add on error
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java
index d156175a0..f6cf85e24 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java
@@ -5,14 +5,17 @@
*/
import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
+import io.confluent.parallelconsumer.PollContext;
+import io.confluent.parallelconsumer.UserFunctions.Processor;
import lombok.experimental.UtilityClass;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
/**
* Single entry point for wrapping the actual execution of user functions
+ *
+ * @author Antony Stubbs
*/
@UtilityClass
public class UserFunctions {
@@ -43,7 +46,7 @@ public static RESULT carefullyRun(BiFunction RESULT carefullyRun(Function wrappedFunction, PARAM userFuncParam) {
+ public static RESULT carefullyRun(Function wrappedFunction, PollContext userFuncParam) {
try {
return wrappedFunction.apply(userFuncParam);
} catch (Throwable e) {
@@ -56,9 +59,9 @@ public static RESULT carefullyRun(Function wrappe
* @param wrappedFunction the function to run
* @param userFuncParam the parameter to pass into the user's function
*/
- public static void carefullyRun(Consumer wrappedFunction, PARAM userFuncParam) {
+ public static void carefullyRun(Processor wrappedFunction, PollContext userFuncParam) {
try {
- wrappedFunction.accept(userFuncParam);
+ wrappedFunction.process(userFuncParam);
} catch (Throwable e) {
throw new ExceptionInUserFunctionException(MSG, e);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
index b3e8309e5..fb4afea85 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
@@ -301,11 +301,13 @@ public void addNewIncompleteRecord(ConsumerRecord record) {
/**
* If the offset is higher than expected, according to the previously committed / polled offset, truncate up to it.
- * Offsets between have disappeared and will never be polled again.
+ * If lower, reset down to it.
*
- * Only runs if this is the first {@link WorkContainer} to be added since instantiation.
+ * Only runs if this is the first {@link ConsumerRecord} to be added since instantiation.
+ *
+ * Can be caused by the offset reset policy of the underlying consumer.
*/
- private void maybeTruncateBelowOrAbove(long polledOffset) {
+ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {
if (bootstrapPhase) {
bootstrapPhase = false;
} else {
@@ -313,30 +315,33 @@ private void maybeTruncateBelowOrAbove(long polledOffset) {
return;
}
- long expectedBootstrapRecordOffset = getNextExpectedInitialPolledOffset();
+ // during bootstrap, getOffsetToCommit() will return the offset of the last record committed, so we can use that to determine if we need to truncate
+ long expectedBootstrapRecordOffset = getOffsetToCommit();
- boolean pollAboveExpected = polledOffset > expectedBootstrapRecordOffset;
+ boolean pollAboveExpected = bootstrapPolledOffset > expectedBootstrapRecordOffset;
- boolean pollBelowExpected = polledOffset < expectedBootstrapRecordOffset;
+ boolean pollBelowExpected = bootstrapPolledOffset < expectedBootstrapRecordOffset;
if (pollAboveExpected) {
- // previously committed offset record has been removed, or manual reset to higher offset detected
- log.warn("Truncating state - removing records lower than {}. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled {} but " +
- "expected {} from loaded commit data. Could be caused by record retention or compaction.",
- polledOffset,
- polledOffset,
+ // previously committed offset record has been removed from the topic, so we need to truncate up to it
+ log.warn("Truncating state - removing records lower than {}. Offsets have been removed from the partition " +
+ "by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " +
+ "Could be caused by record retention or compaction and offset reset policy LATEST.",
+ bootstrapPolledOffset,
+ bootstrapPolledOffset,
expectedBootstrapRecordOffset);
// truncate
- final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(polledOffset, false);
+ final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false);
incompletesToPrune.forEach(incompleteOffsets::remove);
} else if (pollBelowExpected) {
- // manual reset to lower offset detected
+ // reset to lower offset detected, so we need to reset our state to match
log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) - truncating state - all records " +
- "above (including this) will be replayed. Was expecting {} but bootstrap poll was {}.",
- polledOffset,
+ "above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. " +
+ "Could be caused by record retention or compaction and offset reset policy EARLIEST.",
+ bootstrapPolledOffset,
expectedBootstrapRecordOffset,
- polledOffset
+ bootstrapPolledOffset
);
// reset
@@ -365,7 +370,7 @@ public Optional getCommitDataIfDirty() {
// visible for testing
protected OffsetAndMetadata createOffsetAndMetadata() {
Optional payloadOpt = tryToEncodeOffsets();
- long nextOffset = getNextExpectedInitialPolledOffset();
+ long nextOffset = getOffsetToCommit();
return payloadOpt
.map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets))
.orElseGet(() -> new OffsetAndMetadata(nextOffset));
@@ -374,10 +379,10 @@ protected OffsetAndMetadata createOffsetAndMetadata() {
/**
* Next offset expected to be polled, upon freshly connecting to a broker.
*
- * Defines as the offset one below the highest sequentially succeeded offset.
+ * Defined as the offset, one below the highest sequentially succeeded offset.
*/
// visible for testing
- protected long getNextExpectedInitialPolledOffset() {
+ protected long getOffsetToCommit() {
return getOffsetHighestSequentialSucceeded() + 1;
}
@@ -443,7 +448,7 @@ private Optional tryToEncodeOffsets() {
try {
// todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233
OffsetMapCodecManager om = new OffsetMapCodecManager<>(null);
- long offsetOfNextExpectedMessage = getNextExpectedInitialPolledOffset();
+ long offsetOfNextExpectedMessage = getOffsetToCommit();
String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload);
if (mustStrip) {
@@ -523,20 +528,20 @@ private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap, ?>.Records
return;
}
- var low = getFirst(records).get().offset(); // NOSONAR see #isEmpty
+ var lowOffset = getFirst(records).get().offset(); // NOSONAR see #isEmpty
- maybeTruncateBelowOrAbove(low);
+ maybeTruncateBelowOrAbove(lowOffset);
// build the hash set once, so we can do random access checks of our tracked incompletes
var polledOffsetLookup = records.stream()
.map(ConsumerRecord::offset)
.collect(Collectors.toSet());
- var high = getLast(records).get().offset(); // NOSONAR see #isEmpty
+ var highOffset = getLast(records).get().offset(); // NOSONAR see #isEmpty
// for the incomplete offsets within this range of poll batch
- var incompletesWithinPolledBatch = incompleteOffsets.keySet().subSet(low, true, high, true);
var offsetsToRemoveFromTracking = new ArrayList();
+ var incompletesWithinPolledBatch = incompleteOffsets.keySet().subSet(lowOffset, true, highOffset, true);
for (long incompleteOffset : incompletesWithinPolledBatch) {
boolean offsetMissingFromPolledRecords = !polledOffsetLookup.contains(incompleteOffset);
@@ -553,8 +558,8 @@ private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap, ?>.Records
"base offset, after initial load and before a rebalance.",
offsetsToRemoveFromTracking,
getTp(),
- low,
- high
+ lowOffset,
+ highOffset
);
offsetsToRemoveFromTracking.forEach(incompleteOffsets::remove);
}
diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
index 903d9c098..9abc4cc46 100644
--- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
+++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java
@@ -6,6 +6,7 @@
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ThreadUtils;
+import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
@@ -257,7 +258,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
int i = Integer.parseInt(rec.value());
if (stepIndex != i) {
log.error("bad step: {} vs {}", stepIndex, i);
- throw new RuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
+ throw new FakeRuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
}
stepIndex++;
}
@@ -284,7 +285,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
}
if (!missing.isEmpty())
log.error("Missing: {}", missing);
- throw new RuntimeException("bad step, expected message(s) is missing: " + missing);
+ throw new FakeRuntimeException("bad step, expected message(s) is missing: " + missing);
}
assertThat(producerSpy.history()).as("Finally, all messages expected messages were produced").hasSize(quantityOfMessagesToProduce);
diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java
index 0feb10655..fa531181d 100644
--- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java
+++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
@@ -157,7 +158,7 @@ private void runPcAndBlockRecordsOverLimitIndex(int blockOver) {
log.debug(msg("{} over block limit of {}, blocking...", index, blockOver));
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new FakeRuntimeException(e);
}
}
});
diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java
index ab3649024..f26aa96fa 100644
--- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java
+++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.java
@@ -4,10 +4,12 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
-import com.google.common.truth.Truth;
+import com.google.common.truth.StringSubject;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.ThreadUtils;
+import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
+import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
@@ -18,6 +20,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -25,13 +28,13 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
+import org.awaitility.Awaitility;
+import org.awaitility.core.TerminalFailureException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import pl.tlinkowski.unij.api.UniSets;
@@ -49,6 +52,7 @@
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
import static pl.tlinkowski.unij.api.UniLists.of;
/**
@@ -69,6 +73,8 @@ class PartitionStateCommittedOffsetIT extends BrokerIntegrationTest activePc;
+
@BeforeEach
void setup() {
setupTopic();
@@ -120,7 +126,7 @@ void compactedTopic() {
log.debug("First run produced, with compaction targets removed: {}", processedOnFirstRunWithTombstoneTargetsRemoved);
//
- triggerTombStoneProcessing();
+ triggerCompactionProcessing();
// The offsets of the tombstone targets should not be read in second run
final int expectedOffsetProcessedToSecondRun = TO_PRODUCE + compactedKeys.size();
@@ -158,7 +164,7 @@ private KafkaContainer setupCompactingKafkaBroker() {
setup();
}
- setupCompacted();
+ setupCompactedEnvironment();
return compactingBroker;
}
@@ -180,7 +186,7 @@ private static long getOffsetFromKey(String key) {
}
@SneakyThrows
- private void setupCompacted() {
+ private void setupCompactedEnvironment() {
log.debug("Setting up aggressive compaction...");
ConfigResource topicConfig = new ConfigResource(ConfigResource.Type.TOPIC, getTopic());
@@ -198,7 +204,7 @@ private void setupCompacted() {
}
@SneakyThrows
- private List triggerTombStoneProcessing() {
+ private List triggerCompactionProcessing() {
// send a lot of messages to fill up segments
List keys = produceMessages(TO_PRODUCE * 2, "log-compaction-trigger-");
// or wait?
@@ -260,41 +266,69 @@ void committedOffsetLower() {
*/
@SneakyThrows
private void runPcCheckStartIs(long targetStartOffset, long checkUpTo, GroupOption groupOption) {
- try (var tempPc = super.getKcu().buildPc(PARTITION, groupOption);) {
- tempPc.subscribe(of(getTopic()));
-
- AtomicLong lowest = new AtomicLong(Long.MAX_VALUE);
- AtomicLong highest = new AtomicLong();
-
- tempPc.poll(recordContexts -> {
- long thisOffset = recordContexts.offset();
- if (thisOffset < lowest.get()) {
- log.debug("Found lowest offset {}", thisOffset);
- lowest.set(thisOffset);
- } else if (thisOffset > highest.get()) {
- highest.set(thisOffset);
- }
- });
+ var tempPc = super.getKcu().buildPc(PARTITION, groupOption);
+ tempPc.subscribe(of(getTopic()));
+
+ AtomicLong lowest = new AtomicLong(Long.MAX_VALUE);
+ AtomicLong highest = new AtomicLong(Long.MIN_VALUE);
+
+ AtomicLong bumpersSent = new AtomicLong();
+
+ tempPc.poll(recordContexts -> {
+ log.error("Consumed: {} Bumpers sent {}", recordContexts.offset(), bumpersSent);
+ long thisOffset = recordContexts.offset();
+ if (thisOffset < lowest.get()) {
+ log.error("Found lowest offset {}", thisOffset);
+ lowest.set(thisOffset);
+ } else if (thisOffset > highest.get()) {
+ highest.set(thisOffset);
+ }
+ });
+
+ //
+ if (offsetResetStrategy.equals(NONE)) {
+ Awaitility.await().untilAsserted(() -> assertThat(tempPc.isClosedOrFailed()).isFalse()); // started
+ Awaitility.await().untilAsserted(() -> assertThat(tempPc.isClosedOrFailed()).isTrue()); // crashed
+ var throwable = tempPc.getFailureCause();
+ StringSubject causeMessage = assertThat(ExceptionUtils.getRootCauseMessage(throwable));
+ causeMessage.contains("NoOffsetForPartitionException");
+ causeMessage.contains("Undefined offset with no reset policy");
+
+ getKcu().close();
+ } else {
+ Awaitility.await()
+ .pollInterval(5, SECONDS) // allow bumper messages to propagate
+ .atMost(30, SECONDS) // so, allow more for more total time
+ .failFast(tempPc::isClosedOrFailed)
+ .untilAsserted(() -> {
+ // in case we're at the end of the topic, add some messages to make sure we get a poll response
+ // must go before failing assertion, otherwise won't be reached
+ getKcu().getProducer().send(new ProducerRecord<>(getTopic(), "key-bumper", "poll-bumper"));
+ bumpersSent.incrementAndGet();
+
+ final long endOffset = getKcu().getAdmin().listOffsets(UniMaps.of(tp, OffsetSpec.earliest())).partitionResult(tp).get().offset();
+ final long startOffset = getKcu().getAdmin().listOffsets(UniMaps.of(tp, OffsetSpec.latest())).partitionResult(tp).get().offset();
+ log.error("start await loop: {}, end: {}, bumpersSent: {}", startOffset, endOffset, bumpersSent);
+
+ //
+ assertWithMessage("Highest seen offset to read up to")
+ .that(highest.get())
+ .isAtLeast(checkUpTo - 1);
+ });
+
+ log.warn("Offset started at should equal the target {}, lowest {}, sent {}, diff is {})", targetStartOffset, lowest, bumpersSent, lowest.get() - targetStartOffset);
+
+ assertWithMessage("Offset started at should equal the target (sent %s , diff is %s)",
+ bumpersSent,
+ lowest.get() - targetStartOffset
+ )
+ .that(lowest.get())
+ .isEqualTo(targetStartOffset);
//
- AtomicLong bumpersSent = new AtomicLong();
- Awaitility.await().untilAsserted(() -> {
- // in case we're at the end of the topic, add some messages to make sure we get a poll response
- getKcu().produceMessages(getTopic(), 1, "poll-bumper");
- bumpersSent.incrementAndGet();
-
- assertWithMessage("Highest seen offset")
- .that(highest.get())
- .isAtLeast(checkUpTo - 1);
- });
-
- var adjustExpected = switch (offsetResetStrategy) {
- case EARLIEST -> targetStartOffset;
- case LATEST -> targetStartOffset + 1;
- case NONE -> throw new IllegalStateException("NONE not supported");
- };
- assertWithMessage("Offset started as").that(lowest.get()).isEqualTo(adjustExpected);
+ tempPc.close();
}
+
}
@SneakyThrows
@@ -319,6 +353,7 @@ private List> runPcUntilOffset(OffsetResetStrategy o
log.debug("Running PC until at least offset {}", succeedUpToOffset);
super.getKcu().setOffsetResetPolicy(offsetResetPolicy);
var tempPc = super.getKcu().buildPc(UNORDERED, newGroup);
+ activePc = tempPc;
try { // can't use auto closeable because close is complicated as it's expected to crash and close rethrows error
SortedSet> seenOffsets = Collections.synchronizedSortedSet(new TreeSet<>(Comparator.comparingLong(PollContext::offset)));
@@ -333,7 +368,7 @@ private List> runPcUntilOffset(OffsetResetStrategy o
log.debug("Exceptional offset {} succeeded", thisOffset);
} else if (thisOffset >= succeedUpToOffset) {
log.debug("Failing on {}", thisOffset);
- throw new RuntimeException("Failing on " + thisOffset);
+ throw new FakeRuntimeException("Failing on " + thisOffset);
} else {
log.debug("Succeeded {}: {}", thisOffset, pollContext.getSingleRecord());
succeededOffsets.add(pollContext);
@@ -345,34 +380,21 @@ private List> runPcUntilOffset(OffsetResetStrategy o
getKcu().produceMessages(getTopic(), 1, "poll-bumper");
- if (offsetResetPolicy.equals(OffsetResetStrategy.NONE)) {
- Awaitility.await().untilAsserted(() -> {
- assertWithMessage("PC crashed / failed fast").that(tempPc.isClosedOrFailed()).isTrue();
- assertThat(tempPc.getFailureCause()).hasCauseThat().hasMessageThat().contains("Error in BrokerPollSystem system");
- var stackTrace = ExceptionUtils.getStackTrace(tempPc.getFailureCause());
- Truth.assertThat(stackTrace).contains("Undefined offset with no reset policy for partitions");
- });
- return UniLists.of();
- } else {
-
- Awaitility.await()
- .failFast(tempPc::isClosedOrFailed)
- .untilAsserted(() -> {
- assertThat(seenOffsets).isNotEmpty();
- assertThat(seenOffsets.last().offset()).isGreaterThan(expectedProcessToOffset - 2);
- });
-
- if (!succeededOffsets.isEmpty()) {
- log.debug("Succeeded up to: {}", succeededOffsets.last().offset());
- }
- log.debug("Consumed up to {}", seenOffsets.last().offset());
-
- var sorted = new ArrayList<>(seenOffsets);
- Collections.sort(sorted, Comparator.comparingLong(PollContext::offset));
+ Awaitility.await()
+ .failFast(tempPc::isClosedOrFailed)
+ .untilAsserted(() -> {
+ assertThat(seenOffsets).isNotEmpty();
+ assertThat(seenOffsets.last().offset()).isGreaterThan(expectedProcessToOffset - 2);
+ });
-
- return sorted;
+ if (!succeededOffsets.isEmpty()) {
+ log.debug("Succeeded up to: {}", succeededOffsets.last().offset());
}
+ log.debug("Consumed up to {}", seenOffsets.last().offset());
+
+ var sorted = new ArrayList<>(seenOffsets);
+ Collections.sort(sorted, Comparator.comparingLong(PollContext::offset));
+ return sorted;
} finally {
try {
tempPc.close(); // close manually in this branch only, as in other branch it crashes
@@ -394,12 +416,10 @@ void committedOffsetHigher() {
final int moveToOffset = 75;
- // reslolve groupId mess
+ // resolve groupId mess
moveCommittedOffset(getKcu().getGroupId(), moveToOffset);
runPcCheckStartIs(moveToOffset, quantity);
- var gkcu5 = getKcu().getConsumer().groupMetadata().groupId();
-
}
private void runPcCheckStartIs(int targetStartOffset, int checkUpTo) {
@@ -409,7 +429,10 @@ private void runPcCheckStartIs(int targetStartOffset, int checkUpTo) {
/**
* CG offset has disappeared - committed offset hasn't been changed, but broker gives us a bootstrap poll result
* with a higher offset than expected. Could be caused by retention period, or compaction.
+ *
+ * @see #noOffsetPolicyOnStartup
*/
+ @SneakyThrows
@EnumSource(value = OffsetResetStrategy.class)
@ParameterizedTest
void committedOffsetRemoved(OffsetResetStrategy offsetResetPolicy) {
@@ -423,18 +446,25 @@ void committedOffsetRemoved(OffsetResetStrategy offsetResetPolicy) {
clientUtils.setOffsetResetPolicy(offsetResetPolicy);
clientUtils.open();
+ if (offsetResetPolicy.equals(NONE)) {
+ // no reset policy, so we set initial offset to zero, to avoid crash on startup - see startup test
+ var consumer = getKcu().getConsumer();
+ consumer.subscribe(of(getTopic()));
+ consumer.poll(Duration.ofSeconds(1));
+ // commit offset 0 to partition
+ consumer.commitSync(UniMaps.of(tp, new OffsetAndMetadata(0)));
+ consumer.close();
+ }
+
var producedCount = produceMessages(TO_PRODUCE).size();
final int END_OFFSET = 50;
var groupId = clientUtils.getGroupId();
- runPcUntilOffset(offsetResetPolicy, END_OFFSET);
+ runPcUntilOffset(offsetResetPolicy, END_OFFSET, END_OFFSET, UniSets.of(), GroupOption.REUSE_GROUP);
- //
- if (offsetResetPolicy.equals(OffsetResetStrategy.NONE)) {
- // test finished
- return;
- }
+ producedCount = producedCount + 1; // run sends one
+ //
final String compactedKey = "key-50";
// before compaction
@@ -449,16 +479,17 @@ void committedOffsetRemoved(OffsetResetStrategy offsetResetPolicy) {
final int EXPECTED_RESET_OFFSET = switch (offsetResetPolicy) {
case EARLIEST -> 0;
- case LATEST -> producedCount + 4;
+ case LATEST -> producedCount;
case NONE -> -1; // will crash / fail fast
};
+
clientUtils.setGroupId(groupId);
runPcCheckStartIs(EXPECTED_RESET_OFFSET, producedCount);
}
}
- private void checkHowManyRecordsWithKeyPresent(String keyToSearchFor, int expectedQuantityToFind, long upToOffset) {
- log.debug("Looking for {} records with key {} up to offset {}", expectedQuantityToFind, keyToSearchFor, upToOffset);
+ private void checkHowManyRecordsWithKeyPresent(String keyToSearchFor, int expectedQuantityToFind, long searchUpToOffset) {
+ log.debug("Looking for {} records with key {} up to offset {}", expectedQuantityToFind, keyToSearchFor, searchUpToOffset);
try (KafkaConsumer newConsumer = getKcu().createNewConsumer(GroupOption.NEW_GROUP);) {
newConsumer.assign(of(tp));
@@ -467,7 +498,7 @@ private void checkHowManyRecordsWithKeyPresent(String keyToSearchFor, int expect
assertThat(positionAfter).isEqualTo(0);
final List> records = new ArrayList<>();
long highest = -1;
- while (highest < upToOffset - 1) {
+ while (highest < searchUpToOffset - 1) {
ConsumerRecords poll = newConsumer.poll(Duration.ofSeconds(1));
records.addAll(poll.records(tp));
var lastOpt = getLast(records);
@@ -488,7 +519,7 @@ private int causeCommittedOffsetToBeRemoved(long offset) {
checkHowManyRecordsWithKeyPresent("key-" + offset, 2, TO_PRODUCE + 2);
- List strings = triggerTombStoneProcessing();
+ List strings = triggerCompactionProcessing();
return 2 + strings.size();
}
@@ -501,4 +532,32 @@ private void sendCompactionKeyForOffset(long offset) throws InterruptedException
.get(1, SECONDS);
}
+ /**
+ * When there's no offset reset policy and there are no offsets for the consumer group, the pc should fail fast,
+ * passing up the exception
+ */
+ @Test
+ void noOffsetPolicyOnStartup() {
+ this.offsetResetStrategy = NONE;
+ try (
+ KafkaClientUtils clientUtils = new KafkaClientUtils(kafkaContainer);
+ ) {
+ clientUtils.setOffsetResetPolicy(offsetResetStrategy);
+ clientUtils.open();
+
+ var producedCount = produceMessages(TO_PRODUCE).size();
+
+ try {
+ runPcUntilOffset(offsetResetStrategy, producedCount, producedCount, UniSets.of(), GroupOption.REUSE_GROUP);
+ } catch (TerminalFailureException e) {
+ var failureCause = activePc.getFailureCause();
+ var rootCauseMessage = ExceptionUtils.getRootCauseMessage(failureCause);
+ var message = assertThat(rootCauseMessage);
+ message.contains("NoOffsetForPartitionException");
+ message.contains("Undefined offset");
+ message.contains("no reset policy");
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java
index 8f57c385b..838366818 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeException.java
@@ -4,11 +4,13 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import lombok.experimental.StandardException;
+
/**
* Used for testing error handling - easier to identify than a plan exception.
+ *
+ * @author Antony Stubbs
*/
-public class FakeRuntimeException extends RuntimeException {
- public FakeRuntimeException(String msg) {
- super(msg);
- }
+@StandardException
+public class FakeRuntimeException extends PCRetriableException {
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java
index f7c243a7c..c91d4877e 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java
@@ -71,7 +71,7 @@ public void failingActionNothingCommitted(CommitMode commitMode) {
setupParallelConsumerInstance(commitMode);
parallelConsumer.poll((ignore) -> {
- throw new RuntimeException("My user's function error");
+ throw new FakeRuntimeException("My user's function error");
});
// let it process
@@ -108,7 +108,7 @@ void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(CommitMode commitM
// finish processing only msg 1
parallelConsumer.poll(context -> {
- log.error("msg: {}", context);
+ log.debug("msg: {}", context);
startBarrierLatch.countDown();
int offset = (int) context.offset();
LatchTestUtils.awaitLatch(locks, offset);
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java
index c93a1f00f..c7096e6c3 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java
@@ -132,7 +132,7 @@ void bootstrapPollOffsetHigherDueToRetentionOrCompaction() {
addPollToState(state, polledTestBatch);
//
- Truth.assertThat(state.getNextExpectedInitialPolledOffset()).isEqualTo(unexpectedlyHighOffset);
+ Truth.assertThat(state.getOffsetToCommit()).isEqualTo(unexpectedlyHighOffset);
OffsetAndMetadata offsetAndMetadata = state.createOffsetAndMetadata();
assertThat(offsetAndMetadata).getOffset().isEqualTo(unexpectedlyHighOffset);
diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
index 4506566a0..1b1ac4889 100644
--- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
+++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
@@ -141,6 +141,9 @@ void maxRetries() {
final int maxRetries = 10;
final Map, Long> retriesCount = new ConcurrentHashMap<>();
+ pc.pollAndProduceMany();
+ pc.poll(records ->);
+
pc.poll(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
diff --git a/pom.xml b/pom.xml
index d808fc3e5..32eceda1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
0.7.4
4.7.0
0.1.1
- 0.4.2
+ 1.0.0