Skip to content

Commit 1317fe7

Browse files
committed
edit
add scenario edit
1 parent 429246d commit 1317fe7

File tree

3 files changed

+90
-43
lines changed

3 files changed

+90
-43
lines changed

README.adoc

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
:base_url: https://github.com/confluentinc/{github_name}
2424
:issues_link: {base_url}/issues
2525

26-
// dynamic include base for editing in IDEA
26+
// dynamic include base for rendering
2727
:project_root: ./
2828

29-
// uncomment the following if not using IDEA or having issues, for editing the template to see the includes
30-
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e.
31-
// leave it commented out when committing work)
29+
// uncomment the following if IDEA or having issues, for editing the template to see the includes
30+
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work)
3231
//:project_root: ../../
3332

3433

@@ -189,9 +188,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t
189188
* Kafka Streams app that had a slow stage
190189
** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance.
191190
We can break out as described below into the tool for processing that step, then return to the Kafka Streams context.
191+
** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition.
192+
*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <<parallel-joins,parallel join operation>>.
192193
* Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect.
193194

194195

196+
195197
== Feature List
196198
* Have massively parallel consumption processing without running hundreds or thousands of
197199
** Kafka consumer clients
@@ -488,6 +490,7 @@ In future versions, we plan to look at supporting other streaming systems like h
488490

489491
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.
490492

493+
491494
[[streams-usage-code]]
492495
=== Kafka Streams Concurrent Processing
493496

@@ -496,22 +499,6 @@ Use your Streams app to process your data first, then send anything needed to be
496499
.Example usage with Kafka Streams
497500
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]
498501

499-
==== Parallel Joins
500-
501-
PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store.
502-
503-
KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.
504-
If no item is present in the state store for the key, then it's a miss.
505-
If there is, you can join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scnearios.
506-
507-
image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[]
508-
509-
WARNING::Although using a `GlobalKTable` is not strictly nesescary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being colocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
510-
For this reason, a `GlobalKTable` is recommended.
511-
512-
WARNING::Performing a join outsode of KS relinquishes ordering efforts KS applies to populating each side of the join.
513-
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving on the event stream.
514-
515502
.Preprocess in Kafka Streams, then process concurrently
516503
[source,java,indent=0]
517504
----
@@ -551,6 +538,51 @@ Be careful using this technique if your operation is sensitive to the order in w
551538

552539
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.
553540

541+
542+
[[parallel-joins]]
543+
==== Parallel Joins
544+
545+
PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.
546+
547+
image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[]
548+
549+
CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
550+
For this reason, a `GlobalKTable` is recommended.
551+
552+
.Parallel joins with Kafka Streams
553+
[source,java,indent=0]
554+
----
555+
/**
556+
* Needs KeyValueStore injected.
557+
*/
558+
ParallelJoin(KeyValueStore<UserId, UserProfile> store, ParallelConsumer<UserId, UserEvent> pc) {
559+
pc.poll(record -> {
560+
UserId userId = record.key();
561+
UserEvent userEvent = record.value();
562+
563+
UserProfile userProfile = store.get(userId);
564+
if (userProfile != null) { // <1>
565+
// join hit
566+
// create payload with even details and call third party system, or produce a result message
567+
userEvent.getEventPayload();
568+
//....
569+
} else { // <2>
570+
// join miss
571+
// drop - not registered devices for that user
572+
}
573+
});
574+
}
575+
----
576+
<1> If no matching item is present in the state store for the key, then it's a miss.
577+
<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios.
578+
579+
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class.
580+
581+
WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join.
582+
When joining within KS, this is taken care of for you.
583+
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream.
584+
585+
554586
[[ordering-guarantees]]
555587
== Ordering Guarantees
556588

@@ -672,7 +704,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr
672704
However, any given preprocessing can be done in KS, preparing the messages.
673705
One can then use this library to consume from an input topic, produced by KS to process the messages in parallel.
674706

675-
For a code example, see the <<streams-usage-code>> section.
707+
For a code example, see the <<streams-usage-code>> and <<parallel-joins>> section.
676708

677709
.Example usage with Kafka Streams
678710
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]

parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
public class ParallelJoin {
1212

13+
// tag::example[]
1314
/**
1415
* Needs KeyValueStore injected.
1516
*/
@@ -18,17 +19,18 @@ public class ParallelJoin {
1819
UserId userId = record.key();
1920
UserEvent userEvent = record.value();
2021

21-
UserProfile userDeviceTokenRegistry = store.get(userId);
22-
if (userDeviceTokenRegistry != null) {
22+
UserProfile userProfile = store.get(userId);
23+
if (userProfile != null) { // <1>
2324
// join hit
2425
// create payload with even details and call third party system, or produce a result message
2526
userEvent.getEventPayload();
2627
//....
27-
} else {
28+
} else { // <2>
2829
// join miss
2930
// drop - not registered devices for that user
3031
}
3132
});
3233
}
34+
// end::example[]
3335

3436
}

src/docs/README.adoc

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
:base_url: https://github.com/confluentinc/{github_name}
2424
:issues_link: {base_url}/issues
2525

26-
// dynamic include base for editing in IDEA
26+
// dynamic include base for rendering
2727
:project_root: ./
2828

29-
// uncomment the following if not using IDEA or having issues, for editing the template to see the includes
30-
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e.
31-
// leave it commented out when committing work)
29+
// uncomment the following if IDEA or having issues, for editing the template to see the includes
30+
// note that with this line not commented out, the rendering of the root level asiidoc file will be incorrect (i.e. leave it commented out when committing work)
3231
//:project_root: ../../
3332

3433

@@ -187,9 +186,12 @@ The end-to-end latency of the responses to these answers needs to be as low as t
187186
* Kafka Streams app that had a slow stage
188187
** We use Kafka Streams for our message processing, but one of it's steps have characteristics of the above and we need better performance.
189188
We can break out as described below into the tool for processing that step, then return to the Kafka Streams context.
189+
** Message load spikes causing heavy hot spots on join operations where a disproportionate quantity of messages land on one partition.
190+
*** If keys cannot be adjusted or the topology modified to better distribute messages across partitions - consider performing a <<parallel-joins,parallel join operation>>.
190191
* Provisioning extra machines (either virtual machines or real machines) to run multiple clients has a cost, using this library instead avoids the need for extra instances to be deployed in any respect.
191192

192193

194+
193195
== Feature List
194196
* Have massively parallel consumption processing without running hundreds or thousands of
195197
** Kafka consumer clients
@@ -439,6 +441,7 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-ver
439441

440442
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java[Vert.x example] project, and it's test.
441443

444+
442445
[[streams-usage-code]]
443446
=== Kafka Streams Concurrent Processing
444447

@@ -447,31 +450,41 @@ Use your Streams app to process your data first, then send anything needed to be
447450
.Example usage with Kafka Streams
448451
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]
449452

450-
==== Parallel Joins
453+
.Preprocess in Kafka Streams, then process concurrently
454+
[source,java,indent=0]
455+
----
456+
include::{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[tag=example]
457+
----
458+
<1> Setup your Kafka Streams stage as per normal, performing any type of preprocessing in Kafka Streams
459+
<2> For the slow consumer part of your Topology, drop down into the parallel consumer, and use massive concurrency
460+
461+
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.
462+
451463

452-
PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store.
464+
[[parallel-joins]]
465+
==== Parallel Joins
453466

454-
KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.
455-
If no item is present in the state store for the key, then it's a miss.
456-
If there is, you can join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scnearios.
467+
PC can also be used to implement a parallel join against a data source, namely a Kafka Streams (KS) state store. KS state stores can be read by external threads (external to the toplogy context), and so they can be reference in the processing function of PC.
457468

458469
image::https://lucid.app/publicSegments/view/d144c027-653c-4e77-bfa4-8ecaadba1385/image.png[]
459470

460-
WARNING::Although using a `GlobalKTable` is not strictly nesescary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being colocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
471+
CAUTION: Although using a `GlobalKTable` is not strictly necessary, the https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html[extra network hop] caused by data not being collocated when using a sharded `KTable` may negate the performance benefits in some scenarios.
461472
For this reason, a `GlobalKTable` is recommended.
462473

463-
WARNING::Performing a join outsode of KS relinquishes ordering efforts KS applies to populating each side of the join.
464-
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving on the event stream.
465-
466-
.Preprocess in Kafka Streams, then process concurrently
474+
.Parallel joins with Kafka Streams
467475
[source,java,indent=0]
468476
----
469-
include::{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[tag=example]
477+
include::{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[tag=example]
470478
----
471-
<1> Setup your Kafka Streams stage as per normal, performing any type of preprocessing in Kafka Streams
472-
<2> For the slow consumer part of your Topology, drop down into the parallel consumer, and use massive concurrency
479+
<1> If no matching item is present in the state store for the key, then it's a miss.
480+
<2> If lookup is not null, join the data any way you please, and potentially call an external system with the joined data, or output it back to a Kafka topic using the `#pollAndProdce` API if trying to avoid https://thorben-janssen.com/dual-writes/[dual write] scenarios.
481+
482+
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/ParallelJoin.java[Parallel join KS example] class.
483+
484+
WARNING: Performing a join outside of KS relinquishes ordering efforts KS applies to populating each side of the join - i.e. there is no effort to apply any ordering to the corresponding sides of this join.
485+
When joining within KS, this is taken care of for you.
486+
Be careful using this technique if your operation is sensitive to the order in which data is populated in the state store vs arriving from the event stream.
473487

474-
See the link:{project_root}/parallel-consumer-examples/parallel-consumer-example-streams/src/main/java/io/confluent/parallelconsumer/examples/streams/StreamsApp.java[Kafka Streams example] project, and it's test.
475488

476489
[[ordering-guarantees]]
477490
== Ordering Guarantees
@@ -594,7 +607,7 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Pr
594607
However, any given preprocessing can be done in KS, preparing the messages.
595608
One can then use this library to consume from an input topic, produced by KS to process the messages in parallel.
596609

597-
For a code example, see the <<streams-usage-code>> section.
610+
For a code example, see the <<streams-usage-code>> and <<parallel-joins>> section.
598611

599612
.Example usage with Kafka Streams
600613
image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fbf/image.png[Kafka Streams Usage, align="center"]

0 commit comments

Comments
 (0)