Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
:base_url: https://github.com/confluentinc/{github_name}
:issues_link: {base_url}/issues

// dynamic include base for editing in IDEA
// dynamic include base for rendering
:project_root: ./

// uncomment the following if not using IDEA or having issues, for editing the template to see the includes
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e.
// leave it commented out when committing work)
// uncomment the following if IDEA or having issues, for editing the template to see the includes
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work)
//:project_root: ../../


Expand Down Expand Up @@ -189,9 +188,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t
* Kafka Streams app that had a slow stage
** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance.
We can break out as described below into the tool for processing that step, then return to the Kafka Streams context.
** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition.
*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <<parallel-joins,parallel join operation>>.
* Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect.



== Feature List
* Have massively parallel consumption processing without running hundreds or thousands of
** Kafka consumer clients
Expand Down Expand Up @@ -488,6 +490,7 @@ In future versions, we plan to look at supporting other streaming systems like h

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.


[[streams-usage-code]]
=== Kafka Streams Concurrent Processing

Expand Down Expand Up @@ -535,6 +538,51 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.


[[parallel-joins]]
==== Parallel Joins

PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.

image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[]

CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
For this reason, a `GlobalKTable` is recommended.

.Parallel joins with Kafka Streams
[source,java,indent=0]
----
/**
* Needs KeyValueStore injected.
*/
ParallelJoin(KeyValueStore<UserId, UserProfile> store, ParallelConsumer<UserId, UserEvent> pc) {
pc.poll(record -> {
UserId userId = record.key();
UserEvent userEvent = record.value();

UserProfile userProfile = store.get(userId);
if (userProfile != null) { // <1>
// join hit
// create payload with even details and call third party system, or produce a result message
userEvent.getEventPayload();
//....
} else { // <2>
// join miss
// drop - not registered devices for that user
}
});
}
----
<1> If no matching item is present in the state store for the key, then it's a miss.
<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios.

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class.

WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join.
When joining within KS, this is taken care of for you.
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream.

Copy link
Contributor

@JorgenRingen JorgenRingen Mar 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time synchronization of joins in KS isn't done for GlobalKTables (and interactive queries?) anyway 🤔

 Another important difference between a KTable and a GlobalKTable is time synchronization: while processing KTable records is time synchronized based on record timestamps to all other streams, a GlobalKTable is not time synchronized.

https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Do you know if there is any way to make sure that a GlobalKTable has been populated before parallel-consumer processing starts to avoid/minimize chance of join-misses? GlobalKTables are populated in a separate thread. It's usually super-fast, but have had some issues with join-misses in KS for stream-globaltable joins during startup if the globaltable is very large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GKT work a bit different.. they "fully" hydrate on application start. As in, head offset is marked for input topic at start, then topic is loaded to reach that point, then it's just whatever hits first. So, there is some effort there - just trying to point out it's different, and should be understood.

Ah yes, trying to sync with PC separately... 🤔 That's a great question. You could do something janky like read the head message, and wait until that can be queried from the GKT... Otherwise, ideally you could hook into an event listener system to know when GTK bootstrap has finished. I guess there isn't anything for this currently. Oh actually - it might be represented in the run state of KS - that's worth looking into. Thanks for the heads up!


[[ordering-guarantees]]
== Ordering Guarantees

Expand Down Expand Up @@ -656,7 +704,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr
However, any given preprocessing can be done in KS, preparing the messages.
One can then use this library to consume from an input topic, produced by KS to process the messages in parallel.

For a code example, see the <<streams-usage-code>> section.
For a code example, see the <<streams-usage-code>> and <<parallel-joins>> section.

.Example usage with Kafka Streams
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.confluent.parallelconsumer.examples.streams;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/


import io.confluent.parallelconsumer.ParallelConsumer;
import org.apache.kafka.streams.state.KeyValueStore;

public class ParallelJoin {

// tag::example[]
/**
* Needs KeyValueStore injected.
*/
ParallelJoin(KeyValueStore<UserId, UserProfile> store, ParallelConsumer<UserId, UserEvent> pc) {
pc.poll(record -> {
UserId userId = record.key();
UserEvent userEvent = record.value();

UserProfile userProfile = store.get(userId);
if (userProfile != null) { // <1>
// join hit
// create payload with even details and call third party system, or produce a result message
userEvent.getEventPayload();
//....
} else { // <2>
// join miss
// drop - not registered devices for that user
}
});
}
// end::example[]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.confluent.parallelconsumer.examples.streams;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/


import lombok.Value;

@Value
public class UserEvent {
String eventPayload;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.confluent.parallelconsumer.examples.streams;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/


public class UserId {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.confluent.parallelconsumer.examples.streams;

/*-
* Copyright (C) 2020 Confluent, Inc.
*/


public class UserProfile {
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@
<version>${mycila.version}</version>
<inherited>false</inherited>
<!-- To apply the license run: -->
<!-- ↪ mvn com.mycila:license-maven-plugin:format-->
<!-- ↪ mvn com.mycila:license-maven-plugin:format -->
<configuration>
<aggregate>true</aggregate>
<licenseSets>
Expand Down
39 changes: 34 additions & 5 deletions src/docs/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
:base_url: https://github.com/confluentinc/{github_name}
:issues_link: {base_url}/issues

// dynamic include base for editing in IDEA
// dynamic include base for rendering
:project_root: ./

// uncomment the following if not using IDEA or having issues, for editing the template to see the includes
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e.
// leave it commented out when committing work)
// uncomment the following if IDEA or having issues, for editing the template to see the includes
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work)
//:project_root: ../../


Expand Down Expand Up @@ -187,9 +186,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t
* Kafka Streams app that had a slow stage
** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance.
We can break out as described below into the tool for processing that step, then return to the Kafka Streams context.
** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition.
*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <<parallel-joins,parallel join operation>>.
* Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect.



== Feature List
* Have massively parallel consumption processing without running hundreds or thousands of
** Kafka consumer clients
Expand Down Expand Up @@ -439,6 +441,7 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-ver

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.


[[streams-usage-code]]
=== Kafka Streams Concurrent Processing

Expand All @@ -457,6 +460,32 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-str

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.


[[parallel-joins]]
==== Parallel Joins

PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.

image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[]

CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
For this reason, a `GlobalKTable` is recommended.

.Parallel joins with Kafka Streams
[source,java,indent=0]
----
include::{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[tag=example]
----
<1> If no matching item is present in the state store for the key, then it's a miss.
<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios.

See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class.

WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join.
When joining within KS, this is taken care of for you.
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


[[ordering-guarantees]]
== Ordering Guarantees

Expand Down Expand Up @@ -578,7 +607,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr
However, any given preprocessing can be done in KS, preparing the messages.
One can then use this library to consume from an input topic, produced by KS to process the messages in parallel.

For a code example, see the <<streams-usage-code>> section.
For a code example, see the <<streams-usage-code>> and <<parallel-joins>> section.

.Example usage with Kafka Streams
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]
Expand Down