Skip to content

Commit cc379ec

Browse files
committed
Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit
rename step step
1 parent a9ded2d commit cc379ec

File tree

6 files changed

+61
-7
lines changed

6 files changed

+61
-7
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions
3333
}
3434

3535
@Override
36-
public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
36+
public void poll(RecordProcessor.PollConsumer<K, V> usersVoidConsumptionFunction) {
3737
Function<PollContextInternal<K, V>, List<Object>> wrappedUserFunc = (context) -> {
3838
log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);
3939

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Copyright (C) 2020-2022 Confluent, Inc.
55
*/
66

7+
import io.confluent.parallelconsumer.RecordProcessor.PollConsumerAndProducer;
78
import io.confluent.parallelconsumer.internal.DrainingCloseable;
89
import lombok.Data;
910
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -27,20 +28,19 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle
2728
}
2829

2930
/**
30-
* Register a function to be applied in parallel to each received message
31+
* Register a function to be applied in parallel to each received message.
3132
*
3233
* @param usersVoidConsumptionFunction the function
3334
*/
34-
void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);
35-
35+
void poll(RecordProcessor.PollConsumer<K, V> usersVoidConsumptionFunction);
3636

3737
/**
3838
* Register a function to be applied in parallel to each received message, which in turn returns one or more {@link
3939
* ProducerRecord}s to be sent back to the broker.
4040
*
4141
* @param callback applied after the produced message is acknowledged by kafka
4242
*/
43-
void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
43+
void pollAndProduceMany(PollConsumerAndProducer<K, V> userFunction,
4444
Consumer<ConsumeProduceResult<K, V, K, V>> callback);
4545

4646
/**
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.confluent.parallelconsumer;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
6+
import java.util.List;
7+
8+
/**
9+
* Types of user functions used for processing records.
10+
*/
11+
public interface RecordProcessor {
12+
13+
/**
14+
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
15+
*/
16+
@FunctionalInterface
17+
interface PollConsumer<K, V> extends java.util.function.Consumer<PollContext<K, V>> {
18+
19+
/**
20+
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
21+
* <p>
22+
* User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it
23+
* later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged
24+
* at {@code WARN} level.
25+
*
26+
* @param records the Kafka records to process
27+
* @see PCRetriableException
28+
* @see ParallelConsumerOptions#getRetryDelayProvider()
29+
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
30+
*/
31+
void accept(PollContext records);
32+
}
33+
34+
@FunctionalInterface
35+
interface PollConsumerAndProducer<K, V> extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> {
36+
37+
/**
38+
* Like {@link PollConsumer#accept(PollContext)} but also returns records to be produced back to Kafka.
39+
*
40+
* @param records the Kafka records to process
41+
* @return the function result
42+
*/
43+
@Override
44+
List<ProducerRecord<K, V>> apply(PollContext records);
45+
46+
}
47+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
import io.confluent.csid.utils.TimeUtils;
88
import io.confluent.parallelconsumer.*;
9+
import io.confluent.parallelconsumer.PCRetriableException;
10+
import io.confluent.parallelconsumer.ParallelConsumer;
11+
import io.confluent.parallelconsumer.ParallelConsumerOptions;
12+
import io.confluent.parallelconsumer.PollContextInternal;
913
import io.confluent.parallelconsumer.state.WorkContainer;
1014
import io.confluent.parallelconsumer.state.WorkManager;
1115
import lombok.*;

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
*/
66

77
import io.confluent.parallelconsumer.ErrorInUserFunctionException;
8+
import io.confluent.parallelconsumer.RecordProcessor;
89
import lombok.experimental.UtilityClass;
910

1011
import java.util.function.BiFunction;
11-
import java.util.function.Consumer;
1212
import java.util.function.Function;
1313

1414
/**
@@ -56,7 +56,7 @@ public static <PARAM, RESULT> RESULT carefullyRun(Function<PARAM, RESULT> wrappe
5656
* @param wrappedFunction the function to run
5757
* @param userFuncParam the parameter to pass into the user's function
5858
*/
59-
public static <PARAM> void carefullyRun(Consumer<PARAM> wrappedFunction, PARAM userFuncParam) {
59+
public static <PARAM> void carefullyRun(RecordProcessor.PollConsumer<PARAM> wrappedFunction, PARAM userFuncParam) {
6060
try {
6161
wrappedFunction.accept(userFuncParam);
6262
} catch (Exception e) {

parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ void maxRetries() {
141141
final int maxRetries = 10;
142142
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
143143

144+
pc.pollAndProduceMany();
145+
pc.poll(records ->);
146+
144147
pc.poll(context -> {
145148
var consumerRecord = context.getSingleRecord().getConsumerRecord();
146149
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);

0 commit comments

Comments
 (0)