diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java
new file mode 100644
index 000000000..a23619e40
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java
@@ -0,0 +1,33 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import lombok.ToString;
+
+/**
+ * A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
+ * and that it should be retired at a later time.
+ *
+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
+ * is thrown by the user's function, that will be logged as an error (but will still be retried later).
+ *
+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
+ * logged as an error.
+ */
+@ToString
+public class PCRetriableException extends PCUserException {
+
+ public PCRetriableException(String message) {
+ super(message);
+ }
+
+ public PCRetriableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PCRetriableException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java
new file mode 100644
index 000000000..922bf1de3
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java
@@ -0,0 +1,29 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+/**
+ * A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
+ * and that it should be retired at a later time.
+ *
+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
+ * is thrown by the user's function, that will be logged as an error (but will still be retried later).
+ *
+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
+ * logged as an error.
+ */
+public class PCTerminalException extends PCUserException {
+ public PCTerminalException(String message) {
+ super(message);
+ }
+
+ public PCTerminalException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PCTerminalException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java
new file mode 100644
index 000000000..58fcf0484
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java
@@ -0,0 +1,22 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+/**
+ * todo
+ */
+public class PCUserException extends RuntimeException {
+ public PCUserException(String message) {
+ super(message);
+ }
+
+ public PCUserException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PCUserException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
index 771e3e52d..3f6e92240 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
@@ -223,6 +223,14 @@ public boolean isUsingBatching() {
@Builder.Default
private final int maxFailureHistory = 10;
+ private final TerminalFailureReaction terminalFailureReaction;
+
+ public enum TerminalFailureReaction {
+ SHUTDOWN,
+ SKIP,
+ // DLQ, TODO
+ }
+
/**
* @return the combined target of the desired concurrency by the configured batch size
*/
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
index 5f186fb93..6ce59ffcf 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
@@ -13,7 +13,6 @@
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -53,13 +52,12 @@
public abstract class AbstractParallelEoSStreamProcessor implements ParallelConsumer, ConsumerRebalanceListener, Closeable {
public static final String MDC_INSTANCE_ID = "pcId";
- public static final String MDC_OFFSET_MARKER = "offset";
/**
* Key for the work container descriptor that will be added to the {@link MDC diagnostic context} while inside a
* user function.
*/
- private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";
+ static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";
@Getter(PROTECTED)
protected final ParallelConsumerOptions options;
@@ -765,6 +763,7 @@ protected void submitWorkToPool(Function, List>
}
}
+
private void submitWorkToPoolInner(final Function, List> usersFunction,
final Consumer callback,
final List> batch) {
@@ -772,7 +771,8 @@ private void submitWorkToPoolInner(final Function,
log.trace("Sending work ({}) to pool", batch);
Future outputRecordFuture = workerThreadPool.submit(() -> {
addInstanceMDC();
- return runUserFunction(usersFunction, callback, batch);
+ UserFunctionRunner runner = new UserFunctionRunner<>(this);
+ return runner.runUserFunction(usersFunction, callback, batch);
});
// for a batch, each message in the batch shares the same result
for (final WorkContainer workContainer : batch) {
@@ -1087,61 +1087,6 @@ private void updateLastCommitCheckTime() {
lastCommitCheckTime = Instant.now();
}
- /**
- * Run the supplied function.
- */
- protected List, R>> runUserFunction(Function, List> usersFunction,
- Consumer callback,
- List> workContainerBatch) {
- // call the user's function
- List resultsFromUserFunction;
- try {
- if (log.isDebugEnabled()) {
- // first offset of the batch
- MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainerBatch.get(0).offset() + "");
- }
- log.trace("Pool received: {}", workContainerBatch);
-
- //
- boolean workIsStale = wm.checkIfWorkIsStale(workContainerBatch);
- if (workIsStale) {
- // when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them
- log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch);
- return null;
- }
-
- PollContextInternal context = new PollContextInternal<>(workContainerBatch);
- resultsFromUserFunction = usersFunction.apply(context);
-
- for (final WorkContainer kvWorkContainer : workContainerBatch) {
- onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
- }
-
- // capture each result, against the input record
- var intermediateResults = new ArrayList, R>>();
- for (R result : resultsFromUserFunction) {
- log.trace("Running users call back...");
- callback.accept(result);
- }
-
- // fail or succeed, either way we're done
- for (var kvWorkContainer : workContainerBatch) {
- addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
- }
- log.trace("User function future registered");
-
- return intermediateResults;
- } catch (Exception e) {
- // handle fail
- log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e);
- for (var wc : workContainerBatch) {
- wc.onUserFunctionFailure(e);
- addToMailbox(wc); // always add on error
- }
- throw e; // trow again to make the future failed
- }
- }
-
protected void addToMailBoxOnUserFunctionSuccess(WorkContainer wc, List> resultsFromUserFunction) {
addToMailbox(wc);
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java
new file mode 100644
index 000000000..6e7ccf821
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java
@@ -0,0 +1,182 @@
+package io.confluent.parallelconsumer.internal;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import io.confluent.parallelconsumer.*;
+import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
+import io.confluent.parallelconsumer.state.WorkContainer;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.MDC;
+import pl.tlinkowski.unij.api.UniLists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static io.confluent.csid.utils.StringUtils.msg;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP;
+import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR;
+
+/**
+ * Manages the result of running the user's function, particularly error handling.
+ */
+@AllArgsConstructor
+@Slf4j
+public class UserFunctionRunner {
+
+ private final AbstractParallelEoSStreamProcessor pc;
+
+ /**
+ * Run the supplied function.
+ */
+ protected List, R>> runUserFunction(Function, List> usersFunction,
+ Consumer callback,
+ List> workContainerBatch) {
+ // catch and process any internal error
+ try {
+ if (log.isDebugEnabled()) {
+ // first offset of the batch
+ MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainerBatch.get(0).offset() + "");
+ }
+ log.trace("Pool received: {}", workContainerBatch);
+
+ //
+ boolean workIsStale = pc.getWm().checkIfWorkIsStale(workContainerBatch);
+ if (workIsStale) {
+ // when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them
+ log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch);
+ return UniLists.of();
+ }
+
+ PollContextInternal context = new PollContextInternal<>(workContainerBatch);
+
+ return runWithUserExceptions(usersFunction, context, callback);
+ } catch (PCUserException e) {
+ // throw again to make the future failed
+ throw e;
+ } catch (Exception e) {
+ log.error("Unknown internal error handling user function dispatch, terminating");
+
+ pc.closeDontDrainFirst();
+
+ // throw again to make the future failed
+ throw e;
+ }
+ }
+
+ private List, R>> runWithUserExceptions(
+ Function super PollContextInternal, ? extends List> usersFunction,
+ PollContextInternal context,
+ Consumer callback) {
+ try {
+ var resultsFromUserFunction = usersFunction.apply(context);
+ return handleUserSuccess(callback, context.getWorkContainers(), resultsFromUserFunction);
+ } catch (PCTerminalException e) {
+ return handleUserTerminalFailure(context, e, callback);
+ } catch (PCRetriableException e) {
+ handleExplicitUserRetriableFailure(context, e);
+
+ // throw again to make the future failed
+ throw e;
+ } catch (Exception e) {
+ handleImplicitUserRetriableFailure(context, e);
+
+ // throw again to make the future failed
+ throw e;
+ }
+ }
+
+ private List, R>> handleUserSuccess(Consumer callback,
+ List> workContainerBatch,
+ List resultsFromUserFunction) {
+ for (final WorkContainer kvWorkContainer : workContainerBatch) {
+ pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
+ }
+
+ // capture each result, against the input record
+ var intermediateResults = new ArrayList, R>>();
+ for (R result : resultsFromUserFunction) {
+ log.trace("Running users call back...");
+ callback.accept(result);
+ }
+
+ // fail or succeed, either way we're done
+ for (var kvWorkContainer : workContainerBatch) {
+ pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
+ }
+
+ log.trace("User function future registered");
+ return intermediateResults;
+ }
+
+ private List, R>> handleUserTerminalFailure(PollContextInternal context,
+ PCTerminalException e, Consumer callback) {
+ var reaction = pc.getOptions().getTerminalFailureReaction();
+
+ if (reaction == SKIP) {
+ log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}",
+ ParallelConsumerOptions.class.getSimpleName(),
+ context);
+
+ // return empty result to cause system to skip as if it succeeded
+ return handleUserSuccess(callback, context.getWorkContainers(), UniLists.of());
+ } else if (reaction == SHUTDOWN) {
+ log.error("Shutting down upon terminal failure in user function due to {} {} setting in {} - triggering context: {}",
+ reaction,
+ ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(),
+ ParallelConsumerOptions.class.getSimpleName(),
+ context);
+
+ pc.closeDontDrainFirst();
+
+ // throw again to make the future failed
+ throw e;
+ } else {
+ throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction));
+ }
+ }
+
+ private void handleExplicitUserRetriableFailure(PollContextInternal context, PCRetriableException e) {
+ logUserFunctionException(e);
+ markRecordsFailed(context.getWorkContainers(), e);
+ }
+
+ private void handleImplicitUserRetriableFailure(PollContextInternal context, Exception e) {
+ logUserFunctionException(e);
+ markRecordsFailed(context.getWorkContainers(), e);
+ }
+
+ private void markRecordsFailed(List> workContainerBatch, Exception e) {
+ for (var wc : workContainerBatch) {
+ markRecordFailed(e, wc);
+ }
+ }
+
+ private void markRecordFailed(Exception e, WorkContainer wc) {
+ wc.onUserFunctionFailure(e);
+ pc.addToMailbox(wc); // always add on error
+ }
+
+ /**
+ * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware.
+ *
+ * Retriable or
+ * Retryable? Kafka uses Retriable, so we'll go with that ;)
+ */
+ private void logUserFunctionException(Exception e) {
+ var message = "in user function, registering record as failed, returning to queue";
+ if (e instanceof PCRetriableException) {
+ log.debug("Explicit exception {} caught - {}", message, e.toString());
+ } else {
+ log.warn("Exception {}", message, e);
+ }
+ }
+
+}
+
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java
index 55bcb3acd..8cbc483cc 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java
@@ -1,13 +1,13 @@
package io.confluent.parallelconsumer;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
/**
* Used for testing error handling - easier to identify than a plan exception.
*/
-public class FakeRuntimeError extends RuntimeException {
+public class FakeRuntimeError extends PCRetriableException {
public FakeRuntimeError(String msg) {
super(msg);
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java
new file mode 100644
index 000000000..84d928b27
--- /dev/null
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java
@@ -0,0 +1,56 @@
+package io.confluent.parallelconsumer.internal;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import io.confluent.parallelconsumer.PCTerminalException;
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
+import io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction;
+import io.confluent.parallelconsumer.PollContextInternal;
+import io.confluent.parallelconsumer.state.ModelUtils;
+import io.confluent.parallelconsumer.state.WorkManager;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.function.Function;
+
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * @see UserFunctionRunner
+ */
+class UserFunctionRunnerTest {
+
+ @Test
+ void shutdown() {
+ run(SHUTDOWN, context -> {
+ throw new PCTerminalException("fake");
+ });
+ }
+
+ private void run(TerminalFailureReaction shutdown, Function, List