diff --git a/settings.gradle b/settings.gradle index 8dddd8e7..10ac2312 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,4 +23,5 @@ include 'hopping-windows:flinksql' include 'joining-stream-stream:flinksql' include 'splitting:flinksql' include 'tumbling-windows:flinksql' +include 'versioned-ktables:kstreams' include 'window-final-result:kstreams' diff --git a/versioned-ktables/kstreams/README.md b/versioned-ktables/kstreams/README.md new file mode 100644 index 00000000..1c8f3e50 --- /dev/null +++ b/versioned-ktables/kstreams/README.md @@ -0,0 +1,152 @@ +# Versioned KTables for temporal join accuracy + +Proper handling of time in Kafka Stream stream-table joins has historically been difficult to achieve. It used to be when +Kafka Streams executes a stream-table join the stream side event would join the latest available record with the same key on the table side. +But, sometimes it's important for the stream event to match up with a table record by timestamp as well as key. +Consider a stream of stock transactions and a table of stock prices -- it's essential the transaction joins with the +stock price at the time of the transaction, not the latest price. A versioned state store tracks multiple record versions +for the same key, rather than the single latest record per key, as is the case for standard non-versioned stores. + +The key to versioned state stores is to use a `VersionedKeyValueStore` when creating a `KTable`: +``` java annotate + final VersionedBytesStoreSupplier versionedStoreSupplier = + Stores.persistentVersionedKeyValueStore("versioned-ktable-store", + Duration.ofMinutes(10)); + + + final KTable tableInput = builder.table(tableInputTopic, + Materialized.as(versionedStoreSupplier) + .withKeySerde(stringSerde) + .withValueSerde(stringSerde)); +``` +Assuming you have a versioned `KTable` and a `KStream` with out-of-order records to join, the join will be temporally correct since each stream record with be joined +with a table record _aligned by timestamp_ instead of simply using the latest record for the key. + +## Running the example + +You can run the example in this tutorial in one of two ways: locally with Kafka running in Docker, or with Confluent Cloud. + +
+Running Kafka in Docker + +### Prerequisites + +* [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) +* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/) + +### Start Kafka + +* Execute `confluent local kafka start` from a terminal window, and copy the `host:port` output +* Save the file `confluent.properties.orig` as `confluent.properties` (ignored by git) and update the `bootstrap.servers` config with the value from the previous step + +### Create the topics `stream-input-topic`, `table-input-topic` and `output-topic` + +* `confluent local kafka topic create stream-input-topic` +* `confluent local kafka topic create table-input-topic` +* `confluent local kafka topic create output-topic` + +### Start Kafka Streams + +* CD into `versioned-ktables/kstreams` +* Run `./gradlew clean build` +* Run `java -jar build/libs/versioned-ktables-standalone.jar` +* The command above assumes using the properties file `src/main/resources/confluent.properties` if you're using something else you'll need to add the path to the command i.e. `java -jar build/libs/versioned-ktables-standalone.jar [path to props file]` + +### View the results + +The example itself generates samples records to demonstrate proper temporal join semantics with a versioned `KTable`. It generates a stream +of strings containing the first half of popular food combinations like `"peanut butter and"`. On the table side, the correct +strings to complete the food combinations (`"jelly"`) are generated before the stream entries. There are also incorrect matches that +come later (e.g, `"sardines"`). + +In a terminal window, observe with the Confluent CLI that the output topic contains the expected food combinations: + +``` plaintext +confluent local kafka topic consume output-topic --from-beginning +``` + +This will yield output like: +``` plaintext +peanut butter and jelly +ham and eggs +cheese and crackers +tea and crumpets +coffee with cream +``` +Enter `Ctrl-C` to exit the console consumer. + +
+ +
+Confluent Cloud + +### Prerequisites + +* A [Confluent Cloud](https://confluent.cloud/signup) account +* The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) + +
+ Creating a cluster in the Confluent Cloud Console + +Create Kafka cluster following [these directions](https://docs.confluent.io/cloud/current/get-started/index.html) + +### Get the configuration + +After creating a cluster Kafka is up and running all you need to next is get the client configurations. + +* In the Confluent Cloud Console, click on the `Clients` option in the left-hand menu. +* Click on the `Java` tile and create the cluster API key and a Schema Registry API key +* Copy the generated properties into the `confluent.properties.orig` file and save it as `confluent.properties` (ignored by git) +
+ +
+ Creating a cluster with the CLI + +If you already have a cloud account, and you don't yet have a Kafka cluster and credentials for connecting to it, you can get started with CLI exclusively. + +* Run the CLI command `confluent plugin install confluent-cloud_kickstart` +* Then execute `confluent cloud-kickstart --name ` which will create a cluster, enable Schema Registry and all required API keys. This will create a cluster with default settings, to see all the options available use `confluent cloud-kickstart --help` +* Copy the generated client configurations (located in `~/Downloads/java_configs_` by default) into `confluent.properties.org` and save as `confluent.properties`. The full location of the properties file is printed to the console. +
+ +### Create the topics `stream-input-topic`, `table-input-topic` and `output-topic` + +*_Note that if you create the cluster using the CLI plugin you can omit the cluster-id from the commands_* + +* `confluent kafka topic create stream-input-topic --cluster ` +* `confluent kafka topic create table-input-topic --cluster ` +* `confluent kafka topic create output-topic --cluster ` + +### Start Kafka Streams + +* CD into `versioned-ktables/kstreams` +* Run `./gradlew clean build` +* Run `java -jar build/libs/versioned-ktables-standalone.jar` +* The command above assumes using the properties file `src/main/resources/confluent.properties` if you're using something else you'll need to add the path to the command i.e. `java -jar build/libs/versioned-ktables-standalone.jar [path to props file]` + +### View the results + +### View the results + +The example itself generates samples records to demonstrate proper temporal join semantics with a versioned `KTable`. It generates a stream +of strings containing the first half of popular food combinations like `"peanut butter and"`. On the table side, the correct +strings to complete the food combinations (`"jelly"`) are generated before the stream entries. There are also incorrect matches that +come later (e.g, `"sardines"`). + +In a terminal window, observe with the Confluent CLI that the output topic contains the expected food combinations: + +``` plaintext +confluent local kafka topic consume output-topic --from-beginning +``` + +This will yield output like: +``` plaintext +peanut butter and jelly +ham and eggs +cheese and crackers +tea and crumpets +coffee with cream +``` +Enter `Ctrl-C` to exit the console consumer. + +
diff --git a/versioned-ktables/kstreams/build.gradle b/versioned-ktables/kstreams/build.gradle new file mode 100644 index 00000000..3f4e96cb --- /dev/null +++ b/versioned-ktables/kstreams/build.gradle @@ -0,0 +1,69 @@ +buildscript { + repositories { + mavenCentral() + } +} + +plugins { + id "java" + id "application" + id 'com.github.johnrengelman.shadow' version '8.1.1' +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +application { + mainClass = "io.confluent.developer.VersionedKTableExample" +} + +repositories { + mavenCentral() + maven { url 'https://packages.confluent.io/maven/' } +} + + +dependencies { + implementation project(':common') + implementation "org.slf4j:slf4j-simple:2.0.7" + implementation 'org.apache.kafka:kafka-streams:3.6.0' + implementation('org.apache.kafka:kafka-clients') { + version { + strictly '3.6.0' + } + } + implementation "io.confluent:kafka-streams-avro-serde:7.5.1" + implementation "com.typesafe:config:1.4.2" + + testImplementation "org.apache.kafka:kafka-streams-test-utils:3.6.0" + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' + testImplementation 'org.hamcrest:hamcrest:2.2' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2' +} + +test { + useJUnitPlatform() + testLogging { + outputs.upToDateWhen { false } + showStandardStreams = true + events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR" + exceptionFormat = "full" + } +} + +jar { + manifest { + attributes( + "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), + "Main-Class": "io.confluent.developer.VersionedKTableExample" + ) + } +} + +shadowJar { + archiveBaseName = "versioned-ktables-standalone" + archiveClassifier = '' +} diff --git a/versioned-ktables/kstreams/code/.gitignore b/versioned-ktables/kstreams/code/.gitignore deleted file mode 100644 index 35b7d589..00000000 --- a/versioned-ktables/kstreams/code/.gitignore +++ /dev/null @@ -1 +0,0 @@ -tutorial-steps/dev/outputs \ No newline at end of file diff --git a/versioned-ktables/kstreams/code/Makefile b/versioned-ktables/kstreams/code/Makefile deleted file mode 100644 index f0eb610d..00000000 --- a/versioned-ktables/kstreams/code/Makefile +++ /dev/null @@ -1,12 +0,0 @@ -STEPS_DIR := tutorial-steps -DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs -TEMP_DIR := $(shell mktemp -d) -SEQUENCE := "dev, test, ccloud" - -tutorial: - rm -r $(DEV_OUTPUTS_DIR) || true - mkdir $(DEV_OUTPUTS_DIR) - harness-runner ../../../../../_data/harnesses/versioned-ktables/kstreams.yml $(TEMP_DIR) $(SEQUENCE) - diff --ignore-blank-lines --strip-trailing-cr $(STEPS_DIR)/dev/harness-expected-output.txt $(DEV_OUTPUTS_DIR)/actual-output.txt - diff --ignore-blank-lines --strip-trailing-cr $(STEPS_DIR)/dev/harness-expected-output-validate-table.txt $(DEV_OUTPUTS_DIR)/actual-output-validate-table.txt - reset diff --git a/versioned-ktables/kstreams/code/build.gradle b/versioned-ktables/kstreams/code/build.gradle deleted file mode 100644 index 0b26335d..00000000 --- a/versioned-ktables/kstreams/code/build.gradle +++ /dev/null @@ -1,66 +0,0 @@ -buildscript { - repositories { - mavenCentral() - } - dependencies { - classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0" - } -} - -plugins { - id "java" - id "idea" - id "eclipse" -} - -sourceCompatibility = JavaVersion.VERSION_17 -targetCompatibility = JavaVersion.VERSION_17 -version = "0.0.1" - -repositories { - mavenCentral() - - maven { - url "https://packages.confluent.io/maven" - } -} - -apply plugin: "com.github.johnrengelman.shadow" - -dependencies { - implementation "org.apache.avro:avro:1.11.1" - implementation "org.slf4j:slf4j-simple:2.0.7" - implementation 'org.apache.kafka:kafka-streams:3.5.1' - implementation ('org.apache.kafka:kafka-clients') { - version { - strictly '3.5.1' - } - } - implementation "io.confluent:kafka-streams-avro-serde:7.4.0" - - testImplementation "org.apache.kafka:kafka-streams-test-utils:3.5.1" - testImplementation "junit:junit:4.13.2" - testImplementation 'org.hamcrest:hamcrest:2.2' -} - -test { - testLogging { - outputs.upToDateWhen { false } - showStandardStreams = true - exceptionFormat = "full" - } -} - -jar { - manifest { - attributes( - "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), - "Main-Class": "io.confluent.developer.VersionedKTableExample" - ) - } -} - -shadowJar { - archiveBaseName = "versioned-katable-standalone" - archiveClassifier = '' -} diff --git a/versioned-ktables/kstreams/code/configuration/dev.properties b/versioned-ktables/kstreams/code/configuration/dev.properties deleted file mode 100644 index 93789bd7..00000000 --- a/versioned-ktables/kstreams/code/configuration/dev.properties +++ /dev/null @@ -1,14 +0,0 @@ -application.id=versioned-ktables -bootstrap.servers=localhost:9092 - -stream.topic.name=stream-input-topic -stream.topic.partitions=1 -stream.topic.replication.factor=1 - -table.topic.name=table-input-topic -table.topic.partitions=1 -table.topic.replication.factor=1 - -output.topic.name=output-topic -output.topic.partitions=1 -output.topic.replication.factor=1 diff --git a/versioned-ktables/kstreams/code/configuration/test.properties b/versioned-ktables/kstreams/code/configuration/test.properties deleted file mode 100644 index ef284e18..00000000 --- a/versioned-ktables/kstreams/code/configuration/test.properties +++ /dev/null @@ -1,14 +0,0 @@ -application.id=versioned-ktables -state.dir=versioned-ktables-test-state - -stream.topic.name=stream-input-topic -stream.topic.partitions=1 -stream.topic.replication.factor=1 - -table.topic.name=table-input-topic -table.topic.partitions=1 -table.topic.replication.factor=1 - -output.topic.name=output-topic -output.topic.partitions=1 -output.topic.replication.factor=1 \ No newline at end of file diff --git a/versioned-ktables/kstreams/code/docker-compose.yml b/versioned-ktables/kstreams/code/docker-compose.yml deleted file mode 100644 index 9df084c3..00000000 --- a/versioned-ktables/kstreams/code/docker-compose.yml +++ /dev/null @@ -1,31 +0,0 @@ ---- -version: '3' - -services: - - broker: - image: confluentinc/cp-kafka:7.4.1 - hostname: broker - container_name: broker - ports: - - 9092:9092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' - KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - # This is done for running a single broker in combined mode for local development only - # For multi-node deployments you should generate using the following - # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" - # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh - CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' diff --git a/versioned-ktables/kstreams/code/src/main/java/io/confluent/developer/VersionedKTableExample.java b/versioned-ktables/kstreams/code/src/main/java/io/confluent/developer/VersionedKTableExample.java deleted file mode 100644 index 23e4cbb3..00000000 --- a/versioned-ktables/kstreams/code/src/main/java/io/confluent/developer/VersionedKTableExample.java +++ /dev/null @@ -1,218 +0,0 @@ -package io.confluent.developer; - - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; - -import java.io.FileInputStream; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.CountDownLatch; - -public class VersionedKTableExample { - - - public Topology buildTopology(Properties allProps) { - final StreamsBuilder builder = new StreamsBuilder(); - final String streamInputTopic = allProps.getProperty("stream.topic.name"); - final String tableInputTopic = allProps.getProperty("table.topic.name"); - final String totalResultOutputTopic = allProps.getProperty("output.topic.name"); - - final Serde stringSerde = Serdes.String(); - - final VersionedBytesStoreSupplier versionedStoreSupplier = Stores.persistentVersionedKeyValueStore("versioned-ktable-store", Duration.ofMinutes(10)); - final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore("non-versioned-table"); - - final KStream streamInput = builder.stream(streamInputTopic, Consumed.with(stringSerde, stringSerde)); - - final KTable tableInput = builder.table(tableInputTopic, - Materialized.as(versionedStoreSupplier) - .withKeySerde(stringSerde) - .withValueSerde(stringSerde)); - final ValueJoiner valueJoiner = (val1, val2) -> val1 + " " + val2; - - streamInput.join(tableInput, valueJoiner) - .peek((key, value) -> System.out.println("Joined value: " + value)) - .to(totalResultOutputTopic, - Produced.with(stringSerde, stringSerde)); - - return builder.build(); - } - - public void createTopics(final Properties allProps) { - try (final AdminClient client = AdminClient.create(allProps)) { - - final List topics = new ArrayList<>(); - - topics.add(new NewTopic( - allProps.getProperty("stream.topic.name"), - Integer.parseInt(allProps.getProperty("stream.topic.partitions")), - Short.parseShort(allProps.getProperty("stream.topic.replication.factor")))); - - topics.add(new NewTopic( - allProps.getProperty("table.topic.name"), - Integer.parseInt(allProps.getProperty("table.topic.partitions")), - Short.parseShort(allProps.getProperty("table.topic.replication.factor")))); - - topics.add(new NewTopic( - allProps.getProperty("output.topic.name"), - Integer.parseInt(allProps.getProperty("output.topic.partitions")), - Short.parseShort(allProps.getProperty("output.topic.replication.factor")))); - - client.createTopics(topics); - } - } - - public Properties loadEnvProperties(String fileName) throws IOException { - final Properties allProps = new Properties(); - try (final FileInputStream input = new FileInputStream(fileName)) { - allProps.load(input); - } - return allProps; - } - - public static void main(String[] args) throws Exception { - - if (args.length < 1) { - throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file."); - } - - final VersionedKTableExample instance = new VersionedKTableExample(); - final Properties allProps = instance.loadEnvProperties(args[0]); - final Topology topology = instance.buildTopology(allProps); - - instance.createTopics(allProps); - - TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps); - dataGenerator.generate(); - - final KafkaStreams streams = new KafkaStreams(topology, allProps); - final CountDownLatch latch = new CountDownLatch(1); - - // Attach shutdown handler to catch Control-C. - Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { - @Override - public void run() { - streams.close(Duration.ofSeconds(5)); - latch.countDown(); - } - }); - - try { - streams.start(); - latch.await(); - } catch (Throwable e) { - System.exit(1); - } - System.exit(0); - } - - record TutorialDataGenerator(Properties properties) { - - public void generate() { - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - try (Producer producer = new KafkaProducer<>(properties)) { - HashMap>> entryData = new HashMap<>(); - HashMap> dataTimestamps = new HashMap<>(); - Instant now = Instant.now(); - - List> streamMessagesOutOfOrder = Arrays.asList( - KeyValue.pair("one", "peanut butter and"), - KeyValue.pair("two", "ham and"), - KeyValue.pair("three", "cheese and"), - KeyValue.pair("four", "tea and"), - KeyValue.pair("five", "coffee with") - ); - final String topic1 = properties.getProperty("stream.topic.name"); - entryData.put(topic1, streamMessagesOutOfOrder); - - List timestamps = Arrays.asList( - now.minus(50, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(40, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(20, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(10, ChronoUnit.SECONDS).toEpochMilli() - ); - dataTimestamps.put(topic1, timestamps); - - List> tableMessagesOriginal = Arrays.asList( - KeyValue.pair("one", "jelly"), - KeyValue.pair("two", "eggs"), - KeyValue.pair("three", "crackers"), - KeyValue.pair("four", "crumpets"), - KeyValue.pair("five", "cream")); - final String topic2 = properties.getProperty("table.topic.name"); - entryData.put(topic2, tableMessagesOriginal); - dataTimestamps.put(topic2, timestamps); - - - produceRecords(entryData, producer, dataTimestamps); - entryData.clear(); - dataTimestamps.clear(); - - List> tableMessagesLater = Arrays.asList( - KeyValue.pair("one", "sardines"), - KeyValue.pair("two", "an old tire"), - KeyValue.pair("three", "fish eyes"), - KeyValue.pair("four", "moldy bread"), - KeyValue.pair("five", "lots of salt")); - entryData.put(topic2, tableMessagesLater); - - List forwardTimestamps = Arrays.asList( - now.plus(50, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(40, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli() - ); - dataTimestamps.put(topic2, forwardTimestamps); - - produceRecords(entryData, producer, dataTimestamps); - - } - } - - private static void produceRecords(HashMap>> entryData, - Producer producer, - HashMap> timestampsMap) { - entryData.forEach((topic, list) -> - { - List timestamps = timestampsMap.get(topic); - for (int i = 0; i < list.size(); i++) { - long timestamp = timestamps.get(i); - String key = list.get(i).key; - String value = list.get(i).value; - producer.send(new ProducerRecord<>(topic, 0, timestamp, key, value), (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.out); - } else { - System.out.printf("Produced record at offset %d to topic %s %n", metadata.offset(), metadata.topic()); - } - }); - } - } - ); - } - } - -} diff --git a/versioned-ktables/kstreams/code/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java b/versioned-ktables/kstreams/code/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java deleted file mode 100644 index 7c214d79..00000000 --- a/versioned-ktables/kstreams/code/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package io.confluent.developer; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.*; -import org.junit.Test; - -import java.io.IOException; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; - - -public class VersionedKTableExampleTest { - - private final static String TEST_CONFIG_FILE = "configuration/test.properties"; - - @Test - public void versionedKTableTest() throws IOException { - final VersionedKTableExample instance = new VersionedKTableExample(); - final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE); - - final String streamInputTopicName = allProps.getProperty("stream.topic.name"); - final String tableInputTopicName = allProps.getProperty("table.topic.name"); - final String totalResultOutputTopicName = allProps.getProperty("output.topic.name"); - - final Topology topology = instance.buildTopology(allProps); - try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, allProps); - final Serde stringSerde = Serdes.String()) { - final Serializer stringSerializer = stringSerde.serializer(); - final Deserializer keyDeserializer = stringSerde.deserializer(); - - final TestInputTopic streamInputTopic = testDriver.createInputTopic(streamInputTopicName, stringSerializer, stringSerializer); - final TestInputTopic tableInputTopic = testDriver.createInputTopic(tableInputTopicName, stringSerializer, stringSerializer); - - final TestOutputTopic outputTopic = testDriver.createOutputTopic(totalResultOutputTopicName, keyDeserializer, stringSerde.deserializer()); - - Instant now = Instant.now(); - - List> streamMessages = Arrays.asList( - KeyValue.pair("one", "peanut butter and"), - KeyValue.pair("two", "ham and"), - KeyValue.pair("three", "cheese and"), - KeyValue.pair("four", "tea and"), - KeyValue.pair("five", "coffee with") - ); - - List timestamps = Arrays.asList( - now.minus(50, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(40, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(20, ChronoUnit.SECONDS).toEpochMilli(), - now.minus(10, ChronoUnit.SECONDS).toEpochMilli() - ); - - List> tableMessagesOriginal = Arrays.asList( - KeyValue.pair("one", "jelly"), - KeyValue.pair("two", "cheese"), - KeyValue.pair("three", "crackers"), - KeyValue.pair("four", "biscuits"), - KeyValue.pair("five", "cream")); - - List> tableMessagesLater = Arrays.asList( - KeyValue.pair("one", "sardines"), - KeyValue.pair("two", "an old tire"), - KeyValue.pair("three", "fish eyes"), - KeyValue.pair("four", "moldy bread"), - KeyValue.pair("five", "lots of salt")); - - List forwardTimestamps = Arrays.asList( - now.plus(50, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(40, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), - now.plus(30, ChronoUnit.SECONDS).toEpochMilli() - ); - sendEvents(tableInputTopic, tableMessagesOriginal, timestamps); - sendEvents(tableInputTopic, tableMessagesLater, forwardTimestamps); - sendEvents(streamInputTopic, streamMessages, timestamps); - - final List> actualEvents = outputTopic.readKeyValuesToList(); - final List> expectedEvents = Arrays.asList( - KeyValue.pair("one", "peanut butter and jelly"), - KeyValue.pair("two", "ham and cheese"), - KeyValue.pair("three", "cheese and crackers"), - KeyValue.pair("four", "tea and biscuits"), - KeyValue.pair("five", "coffee with cream") - ); - - assertEquals(expectedEvents, actualEvents); - } - } - - private void sendEvents(final TestInputTopic topic, - final List> input, - final List timestamps) { - for (int i = 0; i < input.size(); i++) { - final long timestamp = timestamps.get(i); - final String key = input.get(i).key; - final String value = input.get(i).value; - topic.pipeInput(key, value, timestamp); - } - } -} diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/build-project.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/build-project.sh deleted file mode 100644 index 81875953..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/build-project.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew build diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/build-uberjar.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/build-uberjar.sh deleted file mode 100644 index 12ffd144..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/build-uberjar.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew shadowJar diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/clean-up.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/clean-up.sh deleted file mode 100644 index 36f5aa98..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/clean-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose down diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer-verify-table.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer-verify-table.sh deleted file mode 100644 index 89061b1f..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer-verify-table.sh +++ /dev/null @@ -1,8 +0,0 @@ -docker exec -it broker /usr/bin/kafka-console-consumer \ - --topic table-input-topic \ - --bootstrap-server broker:9092 \ - --from-beginning \ - --property print.key=true \ - --property key.separator=" : " \ - --property print.timestamp=true \ - --max-messages 10 diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer.sh deleted file mode 100644 index 6f894bf9..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer.sh +++ /dev/null @@ -1,8 +0,0 @@ -docker exec -it broker /usr/bin/kafka-console-consumer \ - --topic output-topic \ - --bootstrap-server broker:9092 \ - --from-beginning \ - --property print.key=true \ - --property key.separator=" : " \ - --property print.timestamp=true \ - --max-messages 5 diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/docker-compose-up.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/docker-compose-up.sh deleted file mode 100644 index e6fb3f19..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/docker-compose-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose up -d diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output-validate-table.txt b/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output-validate-table.txt deleted file mode 100644 index 3531cf9b..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output-validate-table.txt +++ /dev/null @@ -1,11 +0,0 @@ -CreateTime:1691166221891 : one : jelly -CreateTime:1691166231891 : two : eggs -CreateTime:1691166241891 : three : crackers -CreateTime:1691166251891 : four : crumpets -CreateTime:1691166261891 : five : cream -CreateTime:1691166321891 : one : sardines -CreateTime:1691166311891 : two : an old tire -CreateTime:1691166301891 : three : fish eyes -CreateTime:1691166301891 : four : moldy bread -CreateTime:1691166301891 : five : lots of salt -Processed a total of 10 messages diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output.txt b/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output.txt deleted file mode 100644 index cb0fc898..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output.txt +++ /dev/null @@ -1,6 +0,0 @@ -CreateTime:1691166221891 : one : peanut butter and jelly -CreateTime:1691166231891 : two : ham and eggs -CreateTime:1691166241891 : three : cheese and crackers -CreateTime:1691166251891 : four : tea and crumpets -CreateTime:1691166261891 : five : coffee with cream -Processed a total of 5 messages diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/gradle-wrapper.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/gradle-wrapper.sh deleted file mode 100644 index 28d02d7b..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/gradle-wrapper.sh +++ /dev/null @@ -1 +0,0 @@ -gradle wrapper diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer-verify-table.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer-verify-table.sh deleted file mode 100644 index 23112441..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer-verify-table.sh +++ /dev/null @@ -1,7 +0,0 @@ -docker exec -it broker /usr/bin/kafka-console-consumer \ - --topic table-input-topic \ - --bootstrap-server broker:9092 \ - --from-beginning \ - --property print.key=true \ - --property key.separator=" : " \ - --max-messages 10 diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer.sh deleted file mode 100644 index a8d11aeb..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-console-consumer.sh +++ /dev/null @@ -1,7 +0,0 @@ -docker exec -it broker /usr/bin/kafka-console-consumer \ - --topic output-topic \ - --bootstrap-server broker:9092 \ - --from-beginning \ - --property print.key=true \ - --property key.separator=" : " \ - --max-messages 5 diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output-validate-table.txt b/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output-validate-table.txt deleted file mode 100644 index cb268e29..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output-validate-table.txt +++ /dev/null @@ -1,11 +0,0 @@ -one : jelly -two : eggs -three : crackers -four : crumpets -five : cream -one : sardines -two : an old tire -three : fish eyes -four : moldy bread -five : lots of salt -Processed a total of 10 messages diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output.txt b/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output.txt deleted file mode 100644 index 5c072659..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/harness-expected-output.txt +++ /dev/null @@ -1,6 +0,0 @@ -one : peanut butter and jelly -two : ham and eggs -three : cheese and crackers -four : tea and crumpets -five : coffee with cream -Processed a total of 5 messages diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/init.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/init.sh deleted file mode 100644 index b51ae070..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/init.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir versioned-ktables && cd versioned-ktables diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/make-configuration-dir.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/make-configuration-dir.sh deleted file mode 100644 index 878943c6..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/make-configuration-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir configuration diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/make-src-dir.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/make-src-dir.sh deleted file mode 100644 index 4caa4290..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/make-src-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir -p src/main/java/io/confluent/developer diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/run-dev-app.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/run-dev-app.sh deleted file mode 100644 index ec35149b..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/run-dev-app.sh +++ /dev/null @@ -1 +0,0 @@ -java -jar build/libs/versioned-katable-standalone-0.0.1.jar configuration/dev.properties diff --git a/versioned-ktables/kstreams/code/tutorial-steps/dev/wait-for-containers.sh b/versioned-ktables/kstreams/code/tutorial-steps/dev/wait-for-containers.sh deleted file mode 100755 index d013e39a..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/dev/wait-for-containers.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -function readiness_probe { - nc -z -w 2 0.0.0.0 9092 -} - -echo "Waiting for the broker to become available ..." - -readiness_probe - -while [[ $? != 0 ]]; do - sleep 5 - readiness_probe -done diff --git a/versioned-ktables/kstreams/code/tutorial-steps/test/invoke-tests.sh b/versioned-ktables/kstreams/code/tutorial-steps/test/invoke-tests.sh deleted file mode 100644 index d2b34833..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/test/invoke-tests.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew test diff --git a/versioned-ktables/kstreams/code/tutorial-steps/test/make-test-dir.sh b/versioned-ktables/kstreams/code/tutorial-steps/test/make-test-dir.sh deleted file mode 100644 index 589222d8..00000000 --- a/versioned-ktables/kstreams/code/tutorial-steps/test/make-test-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir -p src/test/java/io/confluent/developer diff --git a/versioned-ktables/kstreams/code/gradle/wrapper/gradle-wrapper.properties b/versioned-ktables/kstreams/gradle/wrapper/gradle-wrapper.properties similarity index 74% rename from versioned-ktables/kstreams/code/gradle/wrapper/gradle-wrapper.properties rename to versioned-ktables/kstreams/gradle/wrapper/gradle-wrapper.properties index ae04661e..1af9e093 100644 --- a/versioned-ktables/kstreams/code/gradle/wrapper/gradle-wrapper.properties +++ b/versioned-ktables/kstreams/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/versioned-ktables/kstreams/code/gradlew b/versioned-ktables/kstreams/gradlew similarity index 86% rename from versioned-ktables/kstreams/code/gradlew rename to versioned-ktables/kstreams/gradlew index a69d9cb6..1aa94a42 100755 --- a/versioned-ktables/kstreams/code/gradlew +++ b/versioned-ktables/kstreams/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +80,11 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,22 +131,29 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,11 +198,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/versioned-ktables/kstreams/code/gradlew.bat b/versioned-ktables/kstreams/gradlew.bat similarity index 98% rename from versioned-ktables/kstreams/code/gradlew.bat rename to versioned-ktables/kstreams/gradlew.bat index f127cfd4..93e3f59f 100644 --- a/versioned-ktables/kstreams/code/gradlew.bat +++ b/versioned-ktables/kstreams/gradlew.bat @@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% diff --git a/versioned-ktables/kstreams/markup/dev/build-project.adoc b/versioned-ktables/kstreams/markup/dev/build-project.adoc deleted file mode 100644 index 63473a98..00000000 --- a/versioned-ktables/kstreams/markup/dev/build-project.adoc +++ /dev/null @@ -1,10 +0,0 @@ -//// - This file assumes use of Avro schemas. If your tutorial does not use Avro, then you'll probably want to change - the wording below. -//// - -Because we will use an Avro schema in our Java code, we'll need to compile it. The Gradle Avro plugin is a part of the build, so it will see your new Avro files, generate Java code for them, and compile those and all other Java sources. Run this command to get it all done: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/build-project.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/build-uberjar.adoc b/versioned-ktables/kstreams/markup/dev/build-uberjar.adoc deleted file mode 100644 index 820dee45..00000000 --- a/versioned-ktables/kstreams/markup/dev/build-uberjar.adoc +++ /dev/null @@ -1,5 +0,0 @@ -In your terminal, run: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/build-uberjar.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/init.adoc b/versioned-ktables/kstreams/markup/dev/init.adoc deleted file mode 100644 index 3509e462..00000000 --- a/versioned-ktables/kstreams/markup/dev/init.adoc +++ /dev/null @@ -1,5 +0,0 @@ -To get started, make a new directory anywhere you'd like for this project: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/init.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-build-file.adoc b/versioned-ktables/kstreams/markup/dev/make-build-file.adoc deleted file mode 100644 index 32d6dc15..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-build-file.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Create the following Gradle build file, named `build.gradle` for the project: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/build.gradle %}
-+++++ - -Note that the `sourceCompatibility` and `targetCompatibility` are set to Java 17, so make sure that `java -version` displays 17 before proceeding. diff --git a/versioned-ktables/kstreams/markup/dev/make-config-dir.adoc b/versioned-ktables/kstreams/markup/dev/make-config-dir.adoc deleted file mode 100644 index f15c7efe..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-config-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create a directory for configuration data: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/make-configuration-dir.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-dev-file.adoc b/versioned-ktables/kstreams/markup/dev/make-dev-file.adoc deleted file mode 100644 index c6d30ece..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-dev-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Then create a development file at `configuration/dev.properties`: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/configuration/dev.properties %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-docker-compose.adoc b/versioned-ktables/kstreams/markup/dev/make-docker-compose.adoc deleted file mode 100644 index f5477f19..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-docker-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]): - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/docker-compose.yml %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-gradle-wrapper.adoc b/versioned-ktables/kstreams/markup/dev/make-gradle-wrapper.adoc deleted file mode 100644 index 3544aa10..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-gradle-wrapper.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And be sure to run the following command to obtain the Gradle wrapper: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/gradle-wrapper.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-src-dir.adoc b/versioned-ktables/kstreams/markup/dev/make-src-dir.adoc deleted file mode 100644 index 0cb4d55b..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-src-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a directory for the Java files in this project: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/make-src-dir.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/make-topology.adoc b/versioned-ktables/kstreams/markup/dev/make-topology.adoc deleted file mode 100644 index d45c7a93..00000000 --- a/versioned-ktables/kstreams/markup/dev/make-topology.adoc +++ /dev/null @@ -1,41 +0,0 @@ - -Before you create the Java class to run the `VersionedKTable` example, let's dive into the main point of this tutorial, creating a versioned `KTable`: - -[source, java] ----- -final VersionedBytesStoreSupplier versionedStoreSupplier = <1> - Stores.persistentVersionedKeyValueStore("versioned-ktable-store", - Duration.ofMinutes(10)) <2> - - -final KTable tableInput = builder.table(tableInputTopic, - Materialized.as(versionedStoreSupplier) <3> - .withKeySerde(stringSerde) - .withValueSerde(stringSerde)); - - - -streamInput.join(tableInput, valueJoiner) <4> - .to(totalResultOutputTopic, - Produced.with(stringSerde, stringSerde)); - ----- - -<1> Creating the versioned state store -<2> Specifying the length of time the table keeps previous versions of a record for querying -<3> Creating the source `KTable` backed by a versioned state store -<4> Using the versioned table in a stream-table join - -So for using a versioned `KTable` you first create a `VersionedBytesStoreSupplier` with the https://javadoc.io/static/org.apache.kafka/kafka-streams/3.5.1/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore-java.lang.String-java.time.Duration-[Stores.persistentVersionedKeyValueStore] factory method providing parameters for the name of the store and the amount of time the store retains previous versions. - -Then you'll use the newly created supplier when creating your source `KTable` with the https://javadoc.io/static/org.apache.kafka/kafka-streams/3.5.1/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String-org.apache.kafka.streams.kstream.Materialized-[StreamBuilder.build(String topic, Materialized)] method as shown above at annotation three. - -Now if your `KStream` https://docs.confluent.io/platform/current/streams/concepts.html#out-of-order-handling[out-of-order records] joining with a `KTable` using a versioned store, the join should result in a temporal correct result as the join of the stream record with a table record is *_aligned by timestamps_* instead of simply using the latest record for the key. - -For more background on versioned state stores https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores[read KIP-889]. - - -Now go ahead and create the Java file at `src/main/java/io/confluent/developer/VersionedKTableExample.java`. -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/src/main/java/io/confluent/developer/VersionedKTableExample.java %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/run-consumer.adoc b/versioned-ktables/kstreams/markup/dev/run-consumer.adoc deleted file mode 100644 index 633c08c9..00000000 --- a/versioned-ktables/kstreams/markup/dev/run-consumer.adoc +++ /dev/null @@ -1,34 +0,0 @@ -//// - This is a sample content file for how to include a console consumer to the tutorial, probably a good idea so the end user can watch the results - of the tutorial. Change the text as needed. - -//// - -Now that you have sent the login events, let's run a consumer to read the output from your streams application - - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer.sh %}
-+++++ - - -You should see something like this - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output.txt %}
-+++++ - - -To prove that the topic backing the KTable does contain the invalid entries run another console consumer command to inspect the contents: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/console-consumer-verify-table.sh %}
-+++++ - - -You should see something similar to this output. Take note of the timestamps of the entries, they show our invalid entries arrived after the correct ones and taking into account the application populates the topics *_before starting_* the Kafka Streams application, you can see how the versioned `KTable` ensures a -semantically correct temporal join. - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/expected-output-validate-table.txt %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/dev/run-dev-app.adoc b/versioned-ktables/kstreams/markup/dev/run-dev-app.adoc deleted file mode 100644 index cc189a1d..00000000 --- a/versioned-ktables/kstreams/markup/dev/run-dev-app.adoc +++ /dev/null @@ -1,41 +0,0 @@ -The application for this tutorial includes a record generator to populate the topics for a stream and table. To demonstrate how the versioned `KTable` works, the application will perform a simple `KStream`-`KTable` join. The stream contains some classic food combinations that aren't complete - the join will the table will fill in the details. - -Here are the records produced to the source topic for the `KStream` - -``` -KeyValue.pair("one", "peanut butter and"), -KeyValue.pair("two", "ham and"), -KeyValue.pair("three", "cheese and"), -KeyValue.pair("four", "tea and"), -KeyValue.pair("five", "coffee with") -``` - -The application will produce two sets of records to the topic for the `KTable`. The first set contains the correct pairings: - -``` -KeyValue.pair("one", "jelly"), -KeyValue.pair("two", "eggs"), -KeyValue.pair("three", "crackers"), -KeyValue.pair("four", "crumpets"), -KeyValue.pair("five", "cream") -``` - -Then a second set of answers is produced to the `KTable` topic, after the initial batch, that don't quite match: - -``` -KeyValue.pair("one", "sardines"), -KeyValue.pair("two", "an old tire"), -KeyValue.pair("three", "fish eyes"), -KeyValue.pair("four", "moldy bread"), -KeyValue.pair("five", "lots of salt") -``` - -Even though there's a second round of records sent to the `KTable`, you'll still get the expected results from the join since your `KTable` is using a versioned store and the timestamps of the stream records and the first batch of table records align. - - -Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won't return, because the application will run until you exit it. There is always another message to process, so streaming applications don't exit until you force them. - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/run-dev-app.sh %}
-+++++ - diff --git a/versioned-ktables/kstreams/markup/dev/short-answer.adoc b/versioned-ktables/kstreams/markup/dev/short-answer.adoc deleted file mode 100644 index 3b2f57a1..00000000 --- a/versioned-ktables/kstreams/markup/dev/short-answer.adoc +++ /dev/null @@ -1,21 +0,0 @@ -You can run your application with link:https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]. - - -In the Kafka Streams application use a `VersionedKeyValueStore` when creating your `KTable`. The [history] retention time shown is just -a placeholder - you are free to use any time that suits your event streaming requirements. - -++++ -
-    
-    final VersionedBytesStoreSupplier versionedStoreSupplier =
-              Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
-                                                       Duration.ofMinutes(10));
-
-
-    final KTable<String, String> tableInput = builder.table(tableInputTopic,
-                Materialized.<String, String>as(versionedStoreSupplier)
-                        .withKeySerde(stringSerde)
-                        .withValueSerde(stringSerde));
-    
-
-++++ diff --git a/versioned-ktables/kstreams/markup/dev/start-compose.adoc b/versioned-ktables/kstreams/markup/dev/start-compose.adoc deleted file mode 100644 index 1bdd2688..00000000 --- a/versioned-ktables/kstreams/markup/dev/start-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And launch it by running: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/dev/docker-compose-up.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/test/invoke-tests.adoc b/versioned-ktables/kstreams/markup/test/invoke-tests.adoc deleted file mode 100644 index 39d97d23..00000000 --- a/versioned-ktables/kstreams/markup/test/invoke-tests.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Now run the test, which is as simple as: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/test/invoke-tests.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/test/make-test-dir.adoc b/versioned-ktables/kstreams/markup/test/make-test-dir.adoc deleted file mode 100644 index 3dcb3fb5..00000000 --- a/versioned-ktables/kstreams/markup/test/make-test-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -First, create a directory for the tests to live in: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/tutorial-steps/test/make-test-dir.sh %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/test/make-test-file.adoc b/versioned-ktables/kstreams/markup/test/make-test-file.adoc deleted file mode 100644 index 90e26484..00000000 --- a/versioned-ktables/kstreams/markup/test/make-test-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -First, create a test file at `configuration/test.properties`: - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/configuration/test.properties %}
-+++++ diff --git a/versioned-ktables/kstreams/markup/test/make-topology-test.adoc b/versioned-ktables/kstreams/markup/test/make-topology-test.adoc deleted file mode 100644 index 249ae2f1..00000000 --- a/versioned-ktables/kstreams/markup/test/make-topology-test.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Then create the following file at `src/test/java/io/confluent/developer/VersionedKTableExampleTest.java`. Testing a Kafka streams application requires a bit of test harness code, but happily the `org.apache.kafka.streams.TopologyTestDriver` class makes this much more pleasant than it otherwise would be. - -There is only one method in `VersionedKTableExampleTest` annotated with `@Test`, and that is `versionedKTableTest()`. This method actually runs our Streams topology using the `TopologyTestDriver` and some mocked data that is set up inside the test method. - -+++++ -
{% include_raw tutorials/versioned-ktables/kstreams/code/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java %}
-+++++ diff --git a/versioned-ktables/kstreams/code/settings.gradle b/versioned-ktables/kstreams/settings.gradle similarity index 83% rename from versioned-ktables/kstreams/code/settings.gradle rename to versioned-ktables/kstreams/settings.gradle index 683a571e..6899d499 100644 --- a/versioned-ktables/kstreams/code/settings.gradle +++ b/versioned-ktables/kstreams/settings.gradle @@ -8,3 +8,5 @@ */ rootProject.name = 'versioned-ktables' +include ':common' +project(':common').projectDir = file('../../common') diff --git a/versioned-ktables/kstreams/src/main/java/io/confluent/developer/VersionedKTableExample.java b/versioned-ktables/kstreams/src/main/java/io/confluent/developer/VersionedKTableExample.java new file mode 100644 index 00000000..48ca23b3 --- /dev/null +++ b/versioned-ktables/kstreams/src/main/java/io/confluent/developer/VersionedKTableExample.java @@ -0,0 +1,184 @@ +package io.confluent.developer; + + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class VersionedKTableExample { + + public static final String STREAM_INPUT_TOPIC = "stream-input-topic"; + public static final String TABLE_INPUT_TOPIC = "table-input-topic"; + public static final String OUTPUT_TOPIC = "output-topic"; + + + public Topology buildTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Serde stringSerde = Serdes.String(); + + final VersionedBytesStoreSupplier versionedStoreSupplier = Stores.persistentVersionedKeyValueStore("versioned-ktable-store", Duration.ofMinutes(10)); + + final KStream streamInput = builder.stream(STREAM_INPUT_TOPIC, Consumed.with(stringSerde, stringSerde)); + + final KTable tableInput = builder.table(TABLE_INPUT_TOPIC, + Materialized.as(versionedStoreSupplier) + .withKeySerde(stringSerde) + .withValueSerde(stringSerde)); + final ValueJoiner valueJoiner = (val1, val2) -> val1 + " " + val2; + + streamInput.join(tableInput, valueJoiner) + .peek((key, value) -> System.out.println("Joined value: " + value)) + .to(OUTPUT_TOPIC, + Produced.with(stringSerde, stringSerde)); + + return builder.build(); + } + + public static void main(String[] args) { + Properties properties; + if (args.length > 0) { + properties = Utils.loadProperties(args[0]); + } else { + properties = Utils.loadProperties(); + } + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "versioned-ktable-application"); + + TutorialDataGenerator dataGenerator = new TutorialDataGenerator(properties); + dataGenerator.generate(); + + VersionedKTableExample versionedKTable = new VersionedKTableExample(); + Topology topology = versionedKTable.buildTopology(); + + try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) { + CountDownLatch countDownLatch = new CountDownLatch(1); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + kafkaStreams.close(Duration.ofSeconds(5)); + countDownLatch.countDown(); + })); + // For local running only; don't do this in production as it wipes out all local state + kafkaStreams.cleanUp(); + kafkaStreams.start(); + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + record TutorialDataGenerator(Properties properties) { + + public void generate() { + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + try (Producer producer = new KafkaProducer<>(properties)) { + HashMap>> entryData = new HashMap<>(); + HashMap> dataTimestamps = new HashMap<>(); + Instant now = Instant.now(); + + List> streamMessagesOutOfOrder = Arrays.asList( + KeyValue.pair("one", "peanut butter and"), + KeyValue.pair("two", "ham and"), + KeyValue.pair("three", "cheese and"), + KeyValue.pair("four", "tea and"), + KeyValue.pair("five", "coffee with") + ); + final String topic1 = STREAM_INPUT_TOPIC; + entryData.put(topic1, streamMessagesOutOfOrder); + + List timestamps = Arrays.asList( + now.minus(50, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(40, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(20, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(10, ChronoUnit.SECONDS).toEpochMilli() + ); + dataTimestamps.put(topic1, timestamps); + + List> tableMessagesOriginal = Arrays.asList( + KeyValue.pair("one", "jelly"), + KeyValue.pair("two", "eggs"), + KeyValue.pair("three", "crackers"), + KeyValue.pair("four", "crumpets"), + KeyValue.pair("five", "cream")); + final String topic2 = TABLE_INPUT_TOPIC; + entryData.put(topic2, tableMessagesOriginal); + dataTimestamps.put(topic2, timestamps); + + produceRecords(entryData, producer, dataTimestamps); + entryData.clear(); + dataTimestamps.clear(); + + List> tableMessagesLater = Arrays.asList( + KeyValue.pair("one", "sardines"), + KeyValue.pair("two", "an old tire"), + KeyValue.pair("three", "fish eyes"), + KeyValue.pair("four", "moldy bread"), + KeyValue.pair("five", "lots of salt")); + entryData.put(topic2, tableMessagesLater); + + List forwardTimestamps = Arrays.asList( + now.plus(50, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(40, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli() + ); + dataTimestamps.put(topic2, forwardTimestamps); + + produceRecords(entryData, producer, dataTimestamps); + + } + } + + private static void produceRecords(HashMap>> entryData, + Producer producer, + HashMap> timestampsMap) { + entryData.forEach((topic, list) -> + { + List timestamps = timestampsMap.get(topic); + for (int i = 0; i < list.size(); i++) { + long timestamp = timestamps.get(i); + String key = list.get(i).key; + String value = list.get(i).value; + producer.send(new ProducerRecord<>(topic, 0, timestamp, key, value), (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.out); + } else { + System.out.printf("Produced record at offset %d to topic %s %n", metadata.offset(), metadata.topic()); + } + }); + } + } + ); + } + } + +} diff --git a/versioned-ktables/kstreams/src/main/resources/confluent.properties.orig b/versioned-ktables/kstreams/src/main/resources/confluent.properties.orig new file mode 100644 index 00000000..a5c2cfd3 --- /dev/null +++ b/versioned-ktables/kstreams/src/main/resources/confluent.properties.orig @@ -0,0 +1,23 @@ +# Required connection configs for Kafka Streams +bootstrap.servers= + +# Required for Kafka Streams +application.id=versioned-ktables-application + + +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='' password=''; +sasl.mechanism=PLAIN +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url= +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=: diff --git a/versioned-ktables/kstreams/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java b/versioned-ktables/kstreams/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java new file mode 100644 index 00000000..f363d0d0 --- /dev/null +++ b/versioned-ktables/kstreams/src/test/java/io/confluent/developer/VersionedKTableExampleTest.java @@ -0,0 +1,111 @@ +package io.confluent.developer; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static io.confluent.developer.VersionedKTableExample.OUTPUT_TOPIC; +import static io.confluent.developer.VersionedKTableExample.STREAM_INPUT_TOPIC; +import static io.confluent.developer.VersionedKTableExample.TABLE_INPUT_TOPIC; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class VersionedKTableExampleTest { + + @Test + public void versionedKTableTest() { + final VersionedKTableExample instance = new VersionedKTableExample(); + final Properties properties = new Properties(); + + final Topology topology = instance.buildTopology(); + try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + final Serde stringSerde = Serdes.String()) { + final Serializer stringSerializer = stringSerde.serializer(); + final Deserializer keyDeserializer = stringSerde.deserializer(); + + final TestInputTopic streamInputTopic = testDriver.createInputTopic(STREAM_INPUT_TOPIC, stringSerializer, stringSerializer); + final TestInputTopic tableInputTopic = testDriver.createInputTopic(TABLE_INPUT_TOPIC, stringSerializer, stringSerializer); + + final TestOutputTopic outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC, keyDeserializer, stringSerde.deserializer()); + + Instant now = Instant.now(); + + List> streamMessages = Arrays.asList( + KeyValue.pair("one", "peanut butter and"), + KeyValue.pair("two", "ham and"), + KeyValue.pair("three", "cheese and"), + KeyValue.pair("four", "tea and"), + KeyValue.pair("five", "coffee with") + ); + + List timestamps = Arrays.asList( + now.minus(50, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(40, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(20, ChronoUnit.SECONDS).toEpochMilli(), + now.minus(10, ChronoUnit.SECONDS).toEpochMilli() + ); + + List> tableMessagesOriginal = Arrays.asList( + KeyValue.pair("one", "jelly"), + KeyValue.pair("two", "cheese"), + KeyValue.pair("three", "crackers"), + KeyValue.pair("four", "biscuits"), + KeyValue.pair("five", "cream")); + + List> tableMessagesLater = Arrays.asList( + KeyValue.pair("one", "sardines"), + KeyValue.pair("two", "an old tire"), + KeyValue.pair("three", "fish eyes"), + KeyValue.pair("four", "moldy bread"), + KeyValue.pair("five", "lots of salt")); + + List forwardTimestamps = Arrays.asList( + now.plus(50, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(40, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli(), + now.plus(30, ChronoUnit.SECONDS).toEpochMilli() + ); + sendEvents(tableInputTopic, tableMessagesOriginal, timestamps); + sendEvents(tableInputTopic, tableMessagesLater, forwardTimestamps); + sendEvents(streamInputTopic, streamMessages, timestamps); + + final List> actualEvents = outputTopic.readKeyValuesToList(); + final List> expectedEvents = Arrays.asList( + KeyValue.pair("one", "peanut butter and jelly"), + KeyValue.pair("two", "ham and cheese"), + KeyValue.pair("three", "cheese and crackers"), + KeyValue.pair("four", "tea and biscuits"), + KeyValue.pair("five", "coffee with cream") + ); + + assertEquals(expectedEvents, actualEvents); + } + } + + private void sendEvents(final TestInputTopic topic, + final List> input, + final List timestamps) { + for (int i = 0; i < input.size(); i++) { + final long timestamp = timestamps.get(i); + final String key = input.get(i).key; + final String value = input.get(i).value; + topic.pipeInput(key, value, timestamp); + } + } +}