Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "maven" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions .idea/runConfigurations/All_Modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/runConfigurations/All_Modules___Unit_Tests.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/runConfigurations/All_Reactor_Unit_Tests.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/runConfigurations/All_Vertx_Unit_Tests.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 60 additions & 4 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//


= parallel-consumer
= Confluent Parallel Consumer
:icons:
:toc: macro
:toclevels: 3
Expand Down Expand Up @@ -73,13 +73,28 @@ It also opens up new use cases like extreme parallelism, external data enrichmen

An overview article to the library can also be found on Confluent's https://www.confluent.io/blog/[blog]: https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/[Introducing the Confluent Parallel Consumer].

[[demo]]
[#demo]
== Demo

.Relative speed demonstration
--
.Click on the animated SVG image to open the https://asciinema.org/a/404299[Asciinema.org player].
image::https://gist.githubusercontent.com/astubbs/26cccaf8b624a53ae26a52dbc00148b1/raw/cbf558b38b0aa624bd7637406579d2a8f00f51db/demo.svg[link="https://asciinema.org/a/404299"]
--

:talk_link: https://www.confluent.io/en-gb/events/kafka-summit-europe-2021/introducing-confluent-labs-parallel-consumer-client/
:talk_preview_image: https://play.vidyard.com/5MLb1Xh7joEQ7phxPxiyPK.jpg

[#talk]
== Video Overview

.Kafka Summit Europe 2021 Presentation
--
.A video presentation overview can be found {talk_link}[from the Kafka Summit Europe 2021] page for the presentatoin, along with slides.
[link = {talk_link}]
image::{talk_preview_image}[Talk]
--

'''

toc::[]
Expand Down Expand Up @@ -220,6 +235,7 @@ without operational burden or harming the cluster's performance
* Solution for the https://en.wikipedia.org/wiki/Head-of-line_blocking["head of line"] blocking problem where continued failure of a single message, prevents progress for messages behind it in the queue
* Per `key` concurrent processing, per partition and unordered message processing
* Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries
* Batch version fo the API to process batches of messages in parallel instead of single messages.
* Vert.x and Reactor.io non-blocking library integration
** Non-blocking I/O work management
** Vert.x's WebClient and general Vert.x Future support
Expand Down Expand Up @@ -416,6 +432,7 @@ Where `${project.version}` is the version to be used:

return eosStreamProcessor;
----

<1> Setup your clients as per normal.
A Producer is only required if using the `produce` flows.
<2> Choose your ordering type, `KEY` in this case.
Expand Down Expand Up @@ -492,6 +509,44 @@ You have the option to either use callbacks to be notified of events, or use the

In future versions, we plan to look at supporting other streaming systems like https://github.com/ReactiveX/RxJava[RxJava] via modules.

[[batching]]
=== Batching

The library also support a batch version of the API.
Using this, you can process batches of messages at once.

To use it, use one of the `batch` versions instead.

IMPORTANT: If an exception is thrown while processing the batch, all messages in the batch will be returned to the queue, to be retried with the standard retry system.
There is no guarantee that the messages will be retried again in the same batch.

==== Usage

[source,java,indent=0]
----
ParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.<String, String>builder()
.consumer(getKafkaConsumer())
.producer(getKafkaProducer())
.maxConcurrency(100)
.batchSize(5) // <1>
.build());
parallelConsumer.pollBatch(batchOfRecords -> {
// convert the batch into the payload for our processing
List<String> payload = batchOfRecords.stream()
.map(this::pareparePayload)
.collect(Collectors.toList());
// process the entire batch payload at once
processBatchPayload(payload);
});
----

<1> Choose your batch size.

==== Restrictions

- If using a batch version of the API, you must choose a batch size in the options class.
- If a batch size is chosen, the "normal" APIs cannot be used, and an error will be thrown.

[[http-with-vertx]]
=== HTTP with the Vert.x Module

Expand All @@ -504,6 +559,7 @@ In future versions, we plan to look at supporting other streaming systems like h
return new RequestInfo("localhost", port, "/api", params); // <1>
});
----

<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.
Expand Down Expand Up @@ -555,6 +611,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
});
}
----

<1> Setup your Kafka Streams stage as per normal, performing any type of preprocessing in Kafka Streams
<2> For the slow consumer part of your Topology, drop down into the parallel consumer, and use massive concurrency

Expand Down Expand Up @@ -830,7 +887,6 @@ For features in development, have a look at the https://github.com/confluentinc/
* https://github.com/confluentinc/parallel-consumer/issues/24[Distributed rate limiting]
* https://github.com/confluentinc/parallel-consumer/issues/27[Metrics]
* More customisable handling[https://github.com/confluentinc/parallel-consumer/issues/65] of HTTP interactions
* Chance to https://github.com/confluentinc/parallel-consumer/issues/18[batch multiple consumer records] into a single or multiple http request objects

=== Long Term - The future ☁️

Expand Down Expand Up @@ -909,8 +965,8 @@ Run a specific integration test method in a submodule project, skipping unit tes
`mvn -Dit.test=TransactionAndCommitModeTest#testLowMaxPoll -DskipUTs=true verify -DfailIfNoTests=false --projects parallel-consumer-core`

Run `git bisect` to find a bad commit, edit the Maven command in `bisect.sh` and run::
[source=bash]

[source=bash]
----
git bisect start good bad
git bisect run ./bisect.sh
Expand Down
7 changes: 7 additions & 0 deletions bin/build-parallel-consumer-core-without-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
#
# Copyright (C) 2020-2022 Confluent, Inc.
#

export JAVA_HOME=$(/usr/libexec/java_home -v13)
mvn clean install -pl parallel-consumer-core -Dmaven.test.skip=true
7 changes: 7 additions & 0 deletions bin/build-without-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
#
# Copyright (C) 2020-2022 Confluent, Inc.
#

export JAVA_HOME=$(/usr/libexec/java_home -v13)
mvn clean install -Dmaven.test.skip=true
7 changes: 7 additions & 0 deletions bin/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
#
# Copyright (C) 2020-2022 Confluent, Inc.
#

export JAVA_HOME=$(/usr/libexec/java_home -v13)
mvn deploy
6 changes: 0 additions & 6 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -62,13 +60,11 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.tongfei</groupId>
<artifactId>progressbar</artifactId>
<version>0.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -100,12 +96,10 @@
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
<version>0.6</version>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-slf4j-backend</artifactId>
<version>0.6</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package io.confluent.csid.utils;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import lombok.experimental.UtilityClass;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.time.Duration.ofMillis;

@UtilityClass
public class JavaUtils {

public static <T> Optional<T> getLast(final List<T> commitHistory) {
if (commitHistory.isEmpty()) return Optional.empty();
return Optional.of(commitHistory.get(commitHistory.size() - 1));
Expand All @@ -23,4 +29,10 @@ public static <T> Optional<T> getOnlyOne(final Map<String, T> stringMapMap) {
if (values.size() > 1) throw new InternalRuntimeError("More than one element");
return Optional.of(values.iterator().next());
}

public static Duration max(Duration left, Duration right) {
long expectedDurationOfClose = Math.max(left.toMillis(), right.toMillis());
return ofMillis(expectedDurationOfClose);
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package io.confluent.csid.utils;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

/**
* Simple identifier tuple for Topic Partitions
*/
public final class KafkaUtils {

// todo rename
public static TopicPartition toTP(ConsumerRecord<?,?> rec) {
public static TopicPartition toTopicPartition(ConsumerRecord<?, ?> rec) {
return new TopicPartition(rec.topic(), rec.partition());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
Expand Down
Loading