diff --git a/README.adoc b/README.adoc index b1c83ac08..1e6a4a861 100644 --- a/README.adoc +++ b/README.adoc @@ -23,12 +23,11 @@ :base_url: https://github.com/confluentinc/{github_name} :issues_link: {base_url}/issues -// dynamic include base for editing in IDEA +// dynamic include base for rendering :project_root: ./ -// uncomment the following if not using IDEA or having issues, for editing the template to see the includes -// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. -// leave it commented out when committing work) +// uncomment the following if IDEA or having issues, for editing the template to see the includes +// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work) //:project_root: ../../ @@ -189,9 +188,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t * Kafka Streams app that had a slow stage ** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance. We can break out as described below into the tool for processing that step, then return to the Kafka Streams context. +** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition. +*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <>. * Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect. + == Feature List * Have massively parallel consumption processing without running hundreds or thousands of ** Kafka consumer clients @@ -488,6 +490,7 @@ In future versions, we plan to look at supporting other streaming systems like h See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test. + [[streams-usage-code]] === Kafka Streams Concurrent Processing @@ -535,6 +538,51 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test. + +[[parallel-joins]] +==== Parallel Joins + +PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC. + +image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[] + +CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios. +For this reason, a `GlobalKTable` is recommended. + +.Parallel joins with Kafka Streams +[source,java,indent=0] +---- + /** + * Needs KeyValueStore injected. + */ + ParallelJoin(KeyValueStore store, ParallelConsumer pc) { + pc.poll(record -> { + UserId userId = record.key(); + UserEvent userEvent = record.value(); + + UserProfile userProfile = store.get(userId); + if (userProfile != null) { // <1> + // join hit + // create payload with even details and call third party system, or produce a result message + userEvent.getEventPayload(); + //.... + } else { // <2> + // join miss + // drop - not registered devices for that user + } + }); + } +---- +<1> If no matching item is present in the state store for the key, then it's a miss. +<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios. + +See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class. + +WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join. +When joining within KS, this is taken care of for you. +Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream. + + [[ordering-guarantees]] == Ordering Guarantees @@ -656,7 +704,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr However, any given preprocessing can be done in KS, preparing the messages. One can then use this library to consume from an input topic, produced by KS to process the messages in parallel. -For a code example, see the <> section. +For a code example, see the <> and <> section. .Example usage with Kafka Streams image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"] diff --git a/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java new file mode 100644 index 000000000..8c73b8ea6 --- /dev/null +++ b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java @@ -0,0 +1,36 @@ +package io.confluent.parallelconsumer.examples.streams; + +/*- + * Copyright (C) 2020 Confluent, Inc. + */ + + +import io.confluent.parallelconsumer.ParallelConsumer; +import org.apache.kafka.streams.state.KeyValueStore; + +public class ParallelJoin { + + // tag::example[] + /** + * Needs KeyValueStore injected. + */ + ParallelJoin(KeyValueStore store, ParallelConsumer pc) { + pc.poll(record -> { + UserId userId = record.key(); + UserEvent userEvent = record.value(); + + UserProfile userProfile = store.get(userId); + if (userProfile != null) { // <1> + // join hit + // create payload with even details and call third party system, or produce a result message + userEvent.getEventPayload(); + //.... + } else { // <2> + // join miss + // drop - not registered devices for that user + } + }); + } + // end::example[] + +} diff --git a/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserEvent.java b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserEvent.java new file mode 100644 index 000000000..ea0aef4d9 --- /dev/null +++ b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserEvent.java @@ -0,0 +1,13 @@ +package io.confluent.parallelconsumer.examples.streams; + +/*- + * Copyright (C) 2020 Confluent, Inc. + */ + + +import lombok.Value; + +@Value +public class UserEvent { + String eventPayload; +} diff --git a/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserId.java b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserId.java new file mode 100644 index 000000000..02e5367e9 --- /dev/null +++ b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserId.java @@ -0,0 +1,9 @@ +package io.confluent.parallelconsumer.examples.streams; + +/*- + * Copyright (C) 2020 Confluent, Inc. + */ + + +public class UserId { +} diff --git a/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserProfile.java b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserProfile.java new file mode 100644 index 000000000..6adfa5e60 --- /dev/null +++ b/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/UserProfile.java @@ -0,0 +1,9 @@ +package io.confluent.parallelconsumer.examples.streams; + +/*- + * Copyright (C) 2020 Confluent, Inc. + */ + + +public class UserProfile { +} diff --git a/pom.xml b/pom.xml index a39ef8d2a..905063536 100644 --- a/pom.xml +++ b/pom.xml @@ -377,7 +377,7 @@ ${mycila.version} false - + true diff --git a/src/docs/README.adoc b/src/docs/README.adoc index 607128bde..a00bf6c13 100644 --- a/src/docs/README.adoc +++ b/src/docs/README.adoc @@ -23,12 +23,11 @@ :base_url: https://github.com/confluentinc/{github_name} :issues_link: {base_url}/issues -// dynamic include base for editing in IDEA +// dynamic include base for rendering :project_root: ./ -// uncomment the following if not using IDEA or having issues, for editing the template to see the includes -// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. -// leave it commented out when committing work) +// uncomment the following if IDEA or having issues, for editing the template to see the includes +// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work) //:project_root: ../../ @@ -187,9 +186,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t * Kafka Streams app that had a slow stage ** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance. We can break out as described below into the tool for processing that step, then return to the Kafka Streams context. +** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition. +*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <>. * Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect. + == Feature List * Have massively parallel consumption processing without running hundreds or thousands of ** Kafka consumer clients @@ -439,6 +441,7 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-ver See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test. + [[streams-usage-code]] === Kafka Streams Concurrent Processing @@ -457,6 +460,32 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-str See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test. + +[[parallel-joins]] +==== Parallel Joins + +PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC. + +image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[] + +CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios. +For this reason, a `GlobalKTable` is recommended. + +.Parallel joins with Kafka Streams +[source,java,indent=0] +---- +include::{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[tag=example] +---- +<1> If no matching item is present in the state store for the key, then it's a miss. +<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios. + +See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class. + +WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join. +When joining within KS, this is taken care of for you. +Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream. + + [[ordering-guarantees]] == Ordering Guarantees @@ -578,7 +607,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr However, any given preprocessing can be done in KS, preparing the messages. One can then use this library to consume from an input topic, produced by KS to process the messages in parallel. -For a code example, see the <> section. +For a code example, see the <> and <> section. .Example usage with Kafka Streams image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]