From 87871d0cc35dc9c7ee3b56cd5f35d72fe59354fe Mon Sep 17 00:00:00 2001 From: Cerchie Date: Fri, 16 Feb 2024 08:46:39 -0700 Subject: [PATCH 01/16] inital file add --- .../kafka/.gitignore | 47 +++++ .../kafka/README.md | 195 ++++++++++++++++++ .../kafka/build.gradle | 146 +++++++++++++ .../kafka/settings.gradle | 10 + .../kafka/src/main/avro/purchase.avsc | 10 + .../confluent/developer/AvroConsumerApp.java | 89 ++++++++ .../confluent/developer/AvroProducerApp.java | 101 +++++++++ .../confluent/developer/ProtoConsumerApp.java | 78 +++++++ .../confluent/developer/ProtoProducerApp.java | 97 +++++++++ .../kafka/src/main/proto/purchase.proto | 10 + .../main/resources/confluent.properties.orig | 25 +++ .../kafka/src/main/resources/test.properties | 3 + .../developer/AvroProduceConsumeAppTest.java | 125 +++++++++++ .../developer/ProtoProduceConsumeAppTest.java | 129 ++++++++++++ 14 files changed, 1065 insertions(+) create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/.gitignore create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/README.md create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/build.gradle create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/settings.gradle create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java create mode 100644 handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java diff --git a/handling-null-values-in-avro-and-protobuf/kafka/.gitignore b/handling-null-values-in-avro-and-protobuf/kafka/.gitignore new file mode 100644 index 00000000..c996a2e2 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/.gitignore @@ -0,0 +1,47 @@ +# Default ignored files +.idea/ +.gradle/ +build/ +out/ + +streams.properties +confluent.properties +resources/confluent.properties + +*.log +notes.txt + +# Ignore Gradle project-specific cache directory +.gradle + +# Ignore Gradle build output directory +build + +**/*.iml +**/*.ipr +**/*.iws + +.idea + +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf +# AWS User-specific +.idea/**/aws.xml +# Generated files +.idea/**/contentModel.xml +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +out/ \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md new file mode 100644 index 00000000..421081a3 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -0,0 +1,195 @@ +In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. + +Let's say you're using an avro schema, and sometimes you want to set the `item` to null. How to adjust the schema to allow a null value? Let's get started. + +[Sign up](https://www.confluent.io/) for a Confluent Cloud account if you haven't already. + +Login, and then click 'Environments -> Create Cloud Environment' and create a cloud environment using the defaults there. + +Navigate to your environment and click 'Add cluster'. Create a cluster using the default values provided. + +Click 'Topics -> Add topic' to create two topics with the default values, one named 'avro-purchase' and the other 'proto-purchase' (we'll cover null values in Protobuf schemas later in the tutorial). + +On the right-hand navbar, click 'API keys -> Add key -> Global access'. Download the values as you will need them to run this tutorial. + +In the same navbar, click 'Clients -> Choose Your Language -> Java -> Create Schema Registry API key'. Save this key and secret as well as the URL listed in the configuration snippet. + +Create a folder anywhere you like to house the code this tutorial: + +```bach +mkdir handling-null-values +``` + +Now, create a file at `handling-null-values/resources/confluent.properties` with these values in it: + +``` +# Required connection configs for Kafka producer, consumer, and admin +bootstrap.servers=BOOTSTRAP_URL/S +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; +sasl.mechanism=PLAIN +use.latest.version=true +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +wrapper.for.nullables=true +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=SR_URL/S +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=CONVERTER_SR_URL/S + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url=SR_URL/S +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=API_KEY:SECRET +``` + +Replace the USERNAME and PASSWORD values with the Confluent Cloud key and secret respectively. Add the url from the schema registry client configuration snippet for `SR_URL/S` and add the schema registry API key and secret for `basic.auth.user.info`, retaining the colon in the placeholder. + +Set up an Avro file for the Kafka producers and consumer at: + +```bash +handling-null-values/kafka/code/src/main/java/io/confluent/developer/AvroProducer.java +``` + +and + +```bash +handling-null-values/kafka/code/src/main/java/io/confluent/developer/AvroConsumer.java +``` + +In the `AvroProducer.java` file, we'll create a Kafka producer and send purchase events to a Kafka topic: + +```java +List avroPurchaseEvents = new ArrayList<>(); + + try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { + String avroTopic = "avro-purchase"; + + PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); + + avroPurchaseEvents.add(avroPurchase); + avroPurchaseEvents.add(avroPurchaseII); + + avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + + } + return avroPurchaseEvents; + } +``` + +In this file, we're setting the `item` in each event explicitly to `null`: + +```java + PurchaseAvro getPurchaseObjectAvro(PurchaseAvro.Builder purchaseAvroBuilder) { + purchaseAvroBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseAvroBuilder.build(); + } +``` + +In the `AvroConsumer.java` file, we'll consume those events and print them to the console: + +```java +avroConsumer.subscribe(Collections.singletonList("avro-purchase")); + + ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + avroConsumerRecords.forEach(avroConsumerRecord -> { + PurchaseAvro avroPurchase = avroConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Avro schema { "); + System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", avroPurchase.getItem()); + + }); + +``` + +Copy and paste the contents of `path to file/s on GitHub` into the `AvroConsumer.java` and `AvroProducer.java` files. + +Create a file at `handling-null-values/kafka/code/src/main/avro/purchase.avsc`. + +Copy and paste the following into that file: + +``` +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": "string" }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} +``` + +If you run the code using `./gradlew runAvroProducer`, you will see that the producer "hangs" and does not produce events. + +For the "item" field in `purchase.avsc`, put: + +``` +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": ["string", "null"] }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} +``` +When you run `./gradlew runAvroProducer` and furthermore, `./gradlew runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. + +Next, copy and paste the contents of `path to file/s on GitHub` into the `ProtoConsumer.java` and `ProtoProducer.java` files. + +Create a file at `handling-null-values/kafka/code/src/main/proto/purchase.proto`. + +Copy and paste the following into that file: + +``` +syntax = "proto3"; + +package io.confluent.developer.proto; +option java_outer_classname = "PurchaseProto"; + +message Purchase { + string item = 1; + double total_cost = 2; + string customer_id = 3; +} +``` + +Look at `ProtoProducerApp.java`, lines 76-77: + +```java + purchaseBuilder.setCustomerId("Customer Null") + .setTotalCost(random.nextDouble() * random.nextInt(100)); +``` + +We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `./gradlew runProtoProducer` and `./gradlew runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. + +Now, if you _explicitly_ set the value of the item to null: + + +```java + purchaseBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); +``` + +In this case, you'll receive a NullPointer error. \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle new file mode 100644 index 00000000..329c896c --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle @@ -0,0 +1,146 @@ +buildscript { + repositories { + mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven/") + } + maven { + url = uri("https://plugins.gradle.org/m2/") + } + maven { + url = uri("https://jitpack.io") + } + } +} + +plugins { + id 'java' + id 'idea' + id 'eclipse' + id "com.google.protobuf" version "0.9.4" + id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.13.0" + id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0" +} + +sourceCompatibility = JavaVersion.VERSION_11 +targetCompatibility = JavaVersion.VERSION_11 + +repositories { + mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven/") + } + + maven { + url = uri("https://jitpack.io") + } +} + +dependencies { + implementation 'com.google.protobuf:protobuf-java:3.21.1' + implementation 'org.apache.avro:avro:1.11.0' + implementation "io.confluent:kafka-avro-serializer:7.1.1" + implementation "io.confluent:kafka-protobuf-serializer:7.1.1" + implementation "io.confluent:kafka-protobuf-provider:7.1.1" + + implementation 'org.apache.logging.log4j:log4j-api:2.18.0' + implementation 'org.apache.logging.log4j:log4j-core:2.18.0' + implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0' + + implementation('org.apache.kafka:kafka-streams:3.2.1') { + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } + implementation('org.apache.kafka:kafka-clients:3.2.1') + testImplementation 'junit:junit:4.13.1' + +} + +protobuf { + generatedFilesBaseDir = "${project.buildDir}/generated-main-proto-java" + + protoc { + artifact = 'com.google.protobuf:protoc:3.18.2' + } +} + +// See https://github.com/ImFlog/schema-registry-plugin for more details on configuring the Schema Registry plugin +schemaRegistry { + def props = new Properties() + file("src/main/resources/confluent.properties").withInputStream { props.load(it) } + def srUrl = props.getProperty("schema.registry.url") + def fullAuth = props.getProperty("basic.auth.user.info") + + config { + subject('avro-purchase-value', 'FORWARD') + subject('proto-purchase-value', 'FORWARD') + } + + if (srUrl != null && fullAuth != null) { + // println "Using Schema Registry endpoint:${srUrl}, username:${auth[0]},password:${auth[1]}" + def auth = fullAuth.split(":") + url = srUrl + + credentials { + // username is the characters up to the ':' in the basic.auth.user.info property + username = auth[0] + // password is everything after ':' in the basic.auth.user.info property + password = auth[1] + } + } else { + println("Expected to find the [schema.registry.url] and [basic.auth.user.info]") + } + + register { + subject('avro-purchase-value', 'src/main/avro/purchase.avsc', 'AVRO') + subject('proto-purchase-value', 'src/main/proto/purchase.proto', 'PROTOBUF') + } + + download { + // commented out to prevent its download which results in the schema + // definition json being flattened to a single line which doesn't + // match the exercise illustration + // subject('avro-purchase-value', 'src/main/avro', 'purchase') + subject('proto-purchase-value', 'src/main/proto', 'purchase') + } + + compatibility { + subject('avro-purchase-value', 'src/main/avro/purchase.avsc', 'AVRO') + subject('proto-purchase-value', 'src/main/proto/purchase.proto', 'PROTOBUF') + } + + +} + + +task runProtoProducer(type: Exec) { + var clientFullPath = 'io.confluent.developer.ProtoProducerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Protobuf Producer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + +task runAvroProducer(type: Exec) { + var clientFullPath = 'io.confluent.developer.AvroProducerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Avro Producer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + + +task runAvroConsumer(type: Exec) { + var clientFullPath = 'io.confluent.developer.AvroConsumerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Avro Consumer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + +task runProtoConsumer(type: Exec) { + var clientFullPath = 'io.confluent.developer.ProtoConsumerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Protobuf Consumer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle b/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle new file mode 100644 index 00000000..05ab1ec3 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/7.0/userguide/multi_project_builds.html + */ + +rootProject.name = 'handling-null-values-in-avro-and-protobuf' \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc b/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc new file mode 100644 index 00000000..4c6188b9 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc @@ -0,0 +1,10 @@ +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": ["string", "null"] }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java new file mode 100644 index 00000000..e5c884a8 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java @@ -0,0 +1,89 @@ +package io.confluent.developer; + +import io.confluent.developer.avro.PurchaseAvro; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.Properties; + +public class AvroConsumerApp implements AutoCloseable{ + + public void close() { + keepConsumingAvro = false; + ExecutorService executorService = null; + executorService.shutdown(); + } + private volatile boolean keepConsumingAvro = true; + + public ConsumerRecords consumePurchaseEvents() { + Properties properties = loadProperties(); + + Map avroConsumerConfigs = new HashMap<>(); + + + properties.forEach((key, value) -> avroConsumerConfigs.put((String) key, value)); + avroConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); + avroConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + avroConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + avroConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); + avroConsumerConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); + + + // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry + avroConsumerConfigs.put("schema.registry.url", "SR_URL"); + avroConsumerConfigs.put("basic.auth.credentials.source", "USER_INFO"); + avroConsumerConfigs.put("basic.auth.user.info", "KEY:SECRET"); + + Consumer avroConsumer = new KafkaConsumer<>(avroConsumerConfigs); + + avroConsumer.subscribe(Collections.singletonList("avro-purchase")); + + ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + avroConsumerRecords.forEach(avroConsumerRecord -> { + PurchaseAvro avroPurchase = avroConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Avro schema { "); + System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", avroPurchase.getItem()); + + }); + return avroConsumerRecords; + } + + + Properties loadProperties () { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + + public static void main (String[]args){ + io.confluent.developer.AvroConsumerApp consumerApp = new io.confluent.developer.AvroConsumerApp(); + consumerApp.consumePurchaseEvents(); + } + + + +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java new file mode 100644 index 00000000..890eb254 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java @@ -0,0 +1,101 @@ +package io.confluent.developer; + +import io.confluent.developer.avro.PurchaseAvro; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class AvroProducerApp { + + private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApp.class); + private final Random random = new Random(); + private final List items = List.of("shoes", "sun-glasses", "t-shirt"); + + public List producePurchaseEvents() { + PurchaseAvro.Builder purchaseBuilder = PurchaseAvro.newBuilder(); + Properties properties = loadProperties(); + + Map avroProducerConfigs = new HashMap<>(); + + + properties.forEach((key, value) -> avroProducerConfigs.put((String) key, value)); + + avroProducerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + avroProducerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true); + // Setting schema auto-registration to false since we already registered the schema manually following best practice + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + + // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry + avroProducerConfigs.put("schema.registry.url", "SR_URL"); + avroProducerConfigs.put("basic.auth.credentials.source", "USER_INFO"); + avroProducerConfigs.put("basic.auth.user.info", "KEY:SECRET"); + + System.out.printf("Producer now configured for using SchemaRegistry %n"); + + List avroPurchaseEvents = new ArrayList<>(); + try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { + String avroTopic = "avro-purchase"; + + PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); + + avroPurchaseEvents.add(avroPurchase); + avroPurchaseEvents.add(avroPurchaseII); + + avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + + } + return avroPurchaseEvents; + } + + + + PurchaseAvro getPurchaseObjectAvro(PurchaseAvro.Builder purchaseAvroBuilder) { + purchaseAvroBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseAvroBuilder.build(); + } + + Properties loadProperties() { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main(String[] args) { + AvroProducerApp producerApp = new AvroProducerApp(); + producerApp.producePurchaseEvents(); + } + } + + diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java new file mode 100644 index 00000000..67c74bb5 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java @@ -0,0 +1,78 @@ +package io.confluent.developer; + +import io.confluent.developer.proto.PurchaseProto.Purchase; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +public class ProtoConsumerApp { + + public void close() { + ExecutorService executorService = null; + executorService.shutdown(); + } + + public ConsumerRecords consumePurchaseEvents() { + Properties properties = loadProperties(); + Map protoConsumerConfigs = new HashMap<>(); + + properties.forEach((key, value) -> protoConsumerConfigs.put((String) key, value)); + protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); + protoConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + protoConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + protoConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); + protoConsumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class); + + // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry + protoConsumerConfigs.put("schema.registry.url", "SR_URL"); + protoConsumerConfigs.put("basic.auth.credentials.source", "USER_INFO"); + protoConsumerConfigs.put("basic.auth.user.info", "KEY:SECRET"); + + + + Consumer protoConsumer = new KafkaConsumer<>(protoConsumerConfigs); + protoConsumer.subscribe(Collections.singletonList("proto-purchase")); + + ConsumerRecords protoConsumerRecords = protoConsumer.poll(Duration.ofSeconds(2)); + protoConsumerRecords.forEach(protoConsumerRecord -> { + Purchase protoPurchase = protoConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Protobuf schema { "); + System.out.printf("Customer: %s, ", protoPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", protoPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", protoPurchase.getItem()); + }); + return protoConsumerRecords; + } + + + Properties loadProperties () { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main (String[]args){ + ProtoConsumerApp consumerApp = new ProtoConsumerApp(); + consumerApp.consumePurchaseEvents(); + } + } diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java new file mode 100644 index 00000000..1068d253 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java @@ -0,0 +1,97 @@ +package io.confluent.developer; + + +import io.confluent.developer.proto.PurchaseProto; +import io.confluent.developer.proto.PurchaseProto.Purchase; +import io.confluent.developer.proto.PurchaseProto.Purchase.Builder; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class ProtoProducerApp { + private static final Logger LOG = LoggerFactory.getLogger(ProtoProducerApp.class); + private final Random random = new Random(); + public List producePurchaseEvents() { + Builder purchaseBuilder = Purchase.newBuilder(); + Properties properties = loadProperties(); + + Map protoProducerConfigs = new HashMap<>(); + + properties.forEach((key, value) -> protoProducerConfigs.put((String) key, value)); + + protoProducerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + protoProducerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class); + // Setting schema auto-registration to false since we already registered the schema manually following best practice + protoProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + + // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry + protoProducerConfigs.put("schema.registry.url", "SR_URL"); + protoProducerConfigs.put("basic.auth.credentials.source", "USER_INFO"); + protoProducerConfigs.put("basic.auth.user.info", "KEY:SECRET"); + + + System.out.printf("Producer now configured for using SchemaRegistry %n"); + List protoPurchaseEvents = new ArrayList<>(); + + try (final Producer producer = new KafkaProducer<>(protoProducerConfigs)) { + String protoTopic = "proto-purchase"; + + Purchase protoPurchase = getPurchaseObjectProto(purchaseBuilder); + Purchase protoPurchaseII = getPurchaseObjectProto(purchaseBuilder); + + protoPurchaseEvents.add(protoPurchase); + protoPurchaseEvents.add(protoPurchaseII); + + protoPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(protoTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Protobuf schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + } + return protoPurchaseEvents; + } + + + + Purchase getPurchaseObjectProto(Builder purchaseBuilder) { + purchaseBuilder.clear(); + purchaseBuilder.setCustomerId("Customer Null") + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseBuilder.build(); + } + + Properties loadProperties() { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main(String[] args) { + ProtoProducerApp producerApp = new ProtoProducerApp(); + producerApp.producePurchaseEvents(); + } +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto new file mode 100644 index 00000000..5d1bd5a4 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package io.confluent.developer.proto; +option java_outer_classname = "PurchaseProto"; + +message Purchase { + string item = 1; + double total_cost = 2; + string customer_id = 3; +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig new file mode 100644 index 00000000..dc8f4f17 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig @@ -0,0 +1,25 @@ +# Required connection configs for Kafka producer, consumer, and admin +bootstrap.servers=BOOTSTRAP_URL/S +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; +sasl.mechanism=PLAIN +use.latest.version=true +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +wrapper.for.nullables=true +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=SR_URL/S +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=CONVERTER_SR_URL/S + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url=SR_URL/S +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=API_KEY:SECRET \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties new file mode 100644 index 00000000..e49c0168 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties @@ -0,0 +1,3 @@ +proto.topic=proto-records +avro.topic=avro-records +schema.registry.url=mock://null-values-produce-consume-test \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java new file mode 100644 index 00000000..4db8514c --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java @@ -0,0 +1,125 @@ +package io.confluent.developer; + + +import io.confluent.developer.avro.PurchaseAvro; + +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + +public class AvroProduceConsumeAppTest { + private static final Map commonConfigs = new HashMap<>(); + private static final Properties properties = new Properties(); + private final Serializer stringSerializer = new StringSerializer(); + private AvroProducerApp avroProducerApp; + private AvroConsumerApp avroConsumerApp; + + @BeforeClass + public static void beforeAllTests() throws IOException { + try (FileInputStream fis = new FileInputStream("../main/resources/test.properties")) { + properties.load(fis); + properties.forEach((key, value) -> commonConfigs.put((String) key, value)); + } + } + + + @Before + public void setup() { + avroProducerApp = new AvroProducerApp(); + avroConsumerApp = new AvroConsumerApp(); + } + + @Test + @SuppressWarnings("unchecked") + public void testProduceAvroMultipleEvents() { + KafkaAvroSerializer avroSerializer + = new KafkaAvroSerializer(); + avroSerializer.configure(commonConfigs, false); + MockProducer mockAvroProducer + = new MockProducer(true, stringSerializer, (Serializer) avroSerializer); + + List returnedAvroResults = avroProducerApp.producePurchaseEvents(); + + returnedAvroResults.forEach(c -> + { + String purchaseAvroId = c.getCustomerId(); + String purchaseItem = c.getItem(); + assertEquals("Customer Null", purchaseAvroId); + assertEquals(null, purchaseItem); + assertEquals(returnedAvroResults.size(), 2); + }); + + } + + @Test + public void testConsumeAvroEvents() { + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + String topic = (String) commonConfigs.get("avro.topic"); + + mockConsumer.schedulePollTask(() -> { + addTopicPartitionsAssignment(topic, mockConsumer); + addConsumerRecords(mockConsumer, avroProducerApp.producePurchaseEvents(), PurchaseAvro::getCustomerId, topic); + }); + + ConsumerRecords returnedAvroResults = avroConsumerApp.consumePurchaseEvents(); + List actualAvroResults = new ArrayList<>(); + returnedAvroResults.forEach(c -> + { + PurchaseAvro purchaseAvro = c.value(); + assertEquals("Customer Null", purchaseAvro.getCustomerId()); + assertEquals(null,purchaseAvro.getItem()); + assertEquals(actualAvroResults.size(), 2); + }); + + mockConsumer.schedulePollTask(() -> avroConsumerApp.close()); + } + + private KeyValue toKeyValue(final ProducerRecord producerRecord) { + return KeyValue.pair(producerRecord.key(), producerRecord.value()); + } + + private void addTopicPartitionsAssignment(final String topic, + final MockConsumer mockConsumer) { + final TopicPartition topicPartition = new TopicPartition(topic, 0); + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + mockConsumer.rebalance(Collections.singletonList(topicPartition)); + mockConsumer.updateBeginningOffsets(beginningOffsets); + } + + private void addConsumerRecords(final MockConsumer mockConsumer, + final List records, + final Function keyFunction, + final String topic) { + AtomicInteger offset = new AtomicInteger(0); + records.stream() + .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r)) + .forEach(mockConsumer::addRecord); + } +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java new file mode 100644 index 00000000..7f62f4a8 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java @@ -0,0 +1,129 @@ +package io.confluent.developer; + + +import io.confluent.developer.proto.PurchaseProto; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class ProtobufProduceConsumeAppTest { + private static final Map commonConfigs = new HashMap<>(); + private static final Properties properties = new Properties(); + private final Serializer stringSerializer = new StringSerializer(); + private ProtoProducerApp protoProducerApp; + private ProtoConsumerApp protoConsumerApp; + + @BeforeClass + public static void beforeAllTests() throws IOException { + try (FileInputStream fis = new FileInputStream("../main/resources/test.properties")) { + properties.load(fis); + properties.forEach((key, value) -> commonConfigs.put((String) key, value)); + } + } + + + @Before + public void setup() { + protoProducerApp = new ProtoProducerApp(); + protoConsumerApp = new ProtoConsumerApp(); + } + + + @Test + @SuppressWarnings("unchecked") + public void testProduceProtoMultipleEvents() { + KafkaProtobufSerializer protoSerializer + = new KafkaProtobufSerializer(); + protoSerializer.configure(commonConfigs, false); + MockProducer mockProtoProducer + = new MockProducer(true, stringSerializer, (Serializer) protoSerializer); + + List actualKeyValues = protoProducerApp.producePurchaseEvents(); + + List returnedAvroResults = protoProducerApp.producePurchaseEvents(); + + List> expectedKeyValues = + mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); + + returnedAvroResults.forEach(c -> + { + String purchaseProtoId = c.getCustomerId(); + assertEquals("Customer Null", purchaseProtoId); + assertEquals(actualKeyValues.size(), 2); + }); + + } + + @Test + public void testConsumeProtoEvents() { + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + String topic = (String) commonConfigs.get("proto.topic"); + + mockConsumer.schedulePollTask(() -> { + addTopicPartitionsAssignment(topic, mockConsumer); + addConsumerRecords(mockConsumer, protoProducerApp.producePurchaseEvents(), PurchaseProto.Purchase::getCustomerId, topic); + }); + + ConsumerRecords returnedProtoResults = protoConsumerApp.consumePurchaseEvents(); + List actualProtoResults = new ArrayList<>(); + returnedProtoResults.forEach(c -> + { + PurchaseProto.Purchase purchaseProto = c.value(); + assertEquals("Customer Null", purchaseProto.getCustomerId()); + assertEquals(null, purchaseProto.getItem()); + assertEquals(actualProtoResults.size(), 2); + }); + + mockConsumer.schedulePollTask(() -> protoConsumerApp.close()); + + } + + private KeyValue toKeyValue(final ProducerRecord producerRecord) { + return KeyValue.pair(producerRecord.key(), producerRecord.value()); + } + + private void addTopicPartitionsAssignment(final String topic, + final MockConsumer mockConsumer) { + final TopicPartition topicPartition = new TopicPartition(topic, 0); + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + mockConsumer.rebalance(Collections.singletonList(topicPartition)); + mockConsumer.updateBeginningOffsets(beginningOffsets); + } + + private void addConsumerRecords(final MockConsumer mockConsumer, + final List records, + final Function keyFunction, + final String topic) { + AtomicInteger offset = new AtomicInteger(0); + records.stream() + .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r)) + .forEach(mockConsumer::addRecord); + } +} \ No newline at end of file From 78f953c9054c22cd25625a65b54897964e03213a Mon Sep 17 00:00:00 2001 From: Lucia Cerchie Date: Mon, 26 Feb 2024 07:15:04 -0700 Subject: [PATCH 02/16] Update handling-null-values-in-avro-and-protobuf/kafka/build.gradle Co-authored-by: Dave Troiano --- .../kafka/build.gradle | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle index 329c896c..36868bbc 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle +++ b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle @@ -22,8 +22,10 @@ plugins { id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0" } -sourceCompatibility = JavaVersion.VERSION_11 -targetCompatibility = JavaVersion.VERSION_11 +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} repositories { mavenCentral() From 16f49bd477c0dbea1e4e5665ba9c96ebf06221b7 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 08:08:42 -0700 Subject: [PATCH 03/16] update dependencies --- .../kafka/build.gradle | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle index 36868bbc..77745615 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle +++ b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle @@ -41,18 +41,18 @@ repositories { dependencies { implementation 'com.google.protobuf:protobuf-java:3.21.1' implementation 'org.apache.avro:avro:1.11.0' - implementation "io.confluent:kafka-avro-serializer:7.1.1" - implementation "io.confluent:kafka-protobuf-serializer:7.1.1" - implementation "io.confluent:kafka-protobuf-provider:7.1.1" + implementation "io.confluent:kafka-avro-serializer:7.5.1" + implementation "io.confluent:kafka-protobuf-serializer:7.5.1" + implementation "io.confluent:kafka-protobuf-provider:7.5.1" implementation 'org.apache.logging.log4j:log4j-api:2.18.0' implementation 'org.apache.logging.log4j:log4j-core:2.18.0' implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0' - implementation('org.apache.kafka:kafka-streams:3.2.1') { + implementation('org.apache.kafka:kafka-streams:3.6.1') { exclude group: 'org.apache.kafka', module: 'kafka-clients' } - implementation('org.apache.kafka:kafka-clients:3.2.1') + implementation('org.apache.kafka:kafka-clients:3.6.1') testImplementation 'junit:junit:4.13.1' } From f3619b6448c97f0609b7e7e5f36fe9bbf36cd921 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 08:09:14 -0700 Subject: [PATCH 04/16] remove tutorial-level .gitignore --- .../kafka/.gitignore | 47 ------------------- 1 file changed, 47 deletions(-) delete mode 100644 handling-null-values-in-avro-and-protobuf/kafka/.gitignore diff --git a/handling-null-values-in-avro-and-protobuf/kafka/.gitignore b/handling-null-values-in-avro-and-protobuf/kafka/.gitignore deleted file mode 100644 index c996a2e2..00000000 --- a/handling-null-values-in-avro-and-protobuf/kafka/.gitignore +++ /dev/null @@ -1,47 +0,0 @@ -# Default ignored files -.idea/ -.gradle/ -build/ -out/ - -streams.properties -confluent.properties -resources/confluent.properties - -*.log -notes.txt - -# Ignore Gradle project-specific cache directory -.gradle - -# Ignore Gradle build output directory -build - -**/*.iml -**/*.ipr -**/*.iws - -.idea - -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf -# AWS User-specific -.idea/**/aws.xml -# Generated files -.idea/**/contentModel.xml -# Sensitive or high-churn files -.idea/**/dataSources/ -.idea/**/dataSources.ids -.idea/**/dataSources.local.xml -.idea/**/sqlDataSources.xml -.idea/**/dynamic.xml -.idea/**/uiDesigner.xml -.idea/**/dbnavigator.xml -# Gradle -.idea/**/gradle.xml -.idea/**/libraries - -out/ \ No newline at end of file From b5b636c5e6a1114638619e18ee3575eb8ab14a22 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 08:50:36 -0700 Subject: [PATCH 05/16] add how to run section --- .../kafka/README.md | 163 +++++++++--------- 1 file changed, 86 insertions(+), 77 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index 421081a3..e230aa46 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -1,70 +1,11 @@ -In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. - -Let's say you're using an avro schema, and sometimes you want to set the `item` to null. How to adjust the schema to allow a null value? Let's get started. - -[Sign up](https://www.confluent.io/) for a Confluent Cloud account if you haven't already. - -Login, and then click 'Environments -> Create Cloud Environment' and create a cloud environment using the defaults there. - -Navigate to your environment and click 'Add cluster'. Create a cluster using the default values provided. - -Click 'Topics -> Add topic' to create two topics with the default values, one named 'avro-purchase' and the other 'proto-purchase' (we'll cover null values in Protobuf schemas later in the tutorial). - -On the right-hand navbar, click 'API keys -> Add key -> Global access'. Download the values as you will need them to run this tutorial. - -In the same navbar, click 'Clients -> Choose Your Language -> Java -> Create Schema Registry API key'. Save this key and secret as well as the URL listed in the configuration snippet. - -Create a folder anywhere you like to house the code this tutorial: - -```bach -mkdir handling-null-values -``` - -Now, create a file at `handling-null-values/resources/confluent.properties` with these values in it: - -``` -# Required connection configs for Kafka producer, consumer, and admin -bootstrap.servers=BOOTSTRAP_URL/S -security.protocol=SASL_SSL -sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; -sasl.mechanism=PLAIN -use.latest.version=true -# Required for correctness in Apache Kafka clients prior to 2.6 -client.dns.lookup=use_all_dns_ips - -wrapper.for.nullables=true -key.converter=io.confluent.connect.avro.AvroConverter -key.converter.schema.registry.url=SR_URL/S -value.converter=io.confluent.connect.avro.AvroConverter -value.converter.schema.registry.url=CONVERTER_SR_URL/S - -# Best practice for higher availability in Apache Kafka clients prior to 3.0 -session.timeout.ms=45000 - -# Best practice for Kafka producer to prevent data loss -acks=all +# How to allow `null` field values in Avro and Protobuf -# Required connection configs for Confluent Cloud Schema Registry -schema.registry.url=SR_URL/S -basic.auth.credentials.source=USER_INFO -basic.auth.user.info=API_KEY:SECRET -``` - -Replace the USERNAME and PASSWORD values with the Confluent Cloud key and secret respectively. Add the url from the schema registry client configuration snippet for `SR_URL/S` and add the schema registry API key and secret for `basic.auth.user.info`, retaining the colon in the placeholder. - -Set up an Avro file for the Kafka producers and consumer at: +In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. -```bash -handling-null-values/kafka/code/src/main/java/io/confluent/developer/AvroProducer.java -``` +Let's say you're using an avro schema, and sometimes you want to set the `item` to null. How to adjust the schema to allow a null value? Let's walk through some pertinent bits of the code before running it. -and -```bash -handling-null-values/kafka/code/src/main/java/io/confluent/developer/AvroConsumer.java -``` - -In the `AvroProducer.java` file, we'll create a Kafka producer and send purchase events to a Kafka topic: +In the `AvroProducer.java` file, there's a Kafka producer to send purchase events to a Kafka topic: ```java List avroPurchaseEvents = new ArrayList<>(); @@ -102,7 +43,7 @@ In this file, we're setting the `item` in each event explicitly to `null`: } ``` -In the `AvroConsumer.java` file, we'll consume those events and print them to the console: +In the `AvroConsumer.java` file, those events are consumed and printed to the console: ```java avroConsumer.subscribe(Collections.singletonList("avro-purchase")); @@ -119,11 +60,82 @@ avroConsumer.subscribe(Collections.singletonList("avro-purchase")); ``` -Copy and paste the contents of `path to file/s on GitHub` into the `AvroConsumer.java` and `AvroProducer.java` files. +## Running the example + +You can run this example either with Confluent Cloud or by running the unit test. + +
+ Kafka Streams-based test + +#### Prerequisites -Create a file at `handling-null-values/kafka/code/src/main/avro/purchase.avsc`. +* Java 17, e.g., follow the OpenJDK installation instructions [here](https://openjdk.org/install/) if you don't have Java. -Copy and paste the following into that file: +#### Run the test + +``` +./gradlew test +``` +
+ Confluent Cloud + +#### Prerequisites + + * A [Confluent Cloud](https://confluent.cloud/signup) account + +#### Run the commands + +[Sign up](https://www.confluent.io/) for a Confluent Cloud account if you haven't already. + +Login, and then click 'Environments -> Create Cloud Environment' and create a cloud environment using the defaults there. + +Navigate to your environment and click 'Add cluster'. Create a cluster using the default values provided. + +Click 'Topics -> Add topic' to create two topics with the default values, one named 'avro-purchase' and the other 'proto-purchase' (we'll cover null values in Protobuf schemas later in the tutorial). + +On the right-hand navbar, click 'API keys -> Add key -> Global access'. Download the values as you will need them to run this tutorial. + +In the same navbar, click 'Clients -> Choose Your Language -> Java -> Create Schema Registry API key'. Save this key and secret as well as the URL listed in the configuration snippet. + +Create a folder anywhere you like to house the code this tutorial: + +```bach +mkdir handling-null-values +``` + +Now, create a file at `handling-null-values/resources/confluent.properties` with these values in it: + +``` +# Required connection configs for Kafka producer, consumer, and admin +bootstrap.servers=BOOTSTRAP_URL/S +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; +sasl.mechanism=PLAIN +use.latest.version=true +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +wrapper.for.nullables=true +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=SR_URL/S +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=CONVERTER_SR_URL/S + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url=SR_URL/S +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=API_KEY:SECRET +``` + +Replace the USERNAME and PASSWORD values with the Confluent Cloud key and secret respectively. Add the url from the schema registry client configuration snippet for `SR_URL/S` and add the schema registry API key and secret for `basic.auth.user.info`, retaining the colon in the placeholder. + +Inside `handling-null-values/kafka/code/src/main/avro/purchase.avsc` you'll see: ``` { @@ -131,16 +143,16 @@ Copy and paste the following into that file: "namespace": "io.confluent.developer.avro", "name":"PurchaseAvro", "fields": [ - {"name": "item", "type": "string" }, + {"name": "item", "type": ["string", "null"] }, {"name": "total_cost", "type": "double" }, {"name": "customer_id", "type": "string"} ] } ``` -If you run the code using `./gradlew runAvroProducer`, you will see that the producer "hangs" and does not produce events. +When you run `./gradlew runAvroProducer` and furthermore, `./gradlew runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. -For the "item" field in `purchase.avsc`, put: +Now remove the `["string", "null"]` in the first field and replace it with `"string"`: ``` { @@ -148,19 +160,16 @@ For the "item" field in `purchase.avsc`, put: "namespace": "io.confluent.developer.avro", "name":"PurchaseAvro", "fields": [ - {"name": "item", "type": ["string", "null"] }, + {"name": "item", "type": "string" }, {"name": "total_cost", "type": "double" }, {"name": "customer_id", "type": "string"} ] } ``` -When you run `./gradlew runAvroProducer` and furthermore, `./gradlew runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. - -Next, copy and paste the contents of `path to file/s on GitHub` into the `ProtoConsumer.java` and `ProtoProducer.java` files. -Create a file at `handling-null-values/kafka/code/src/main/proto/purchase.proto`. +Now, if you run the code using `./gradlew runAvroProducer`, you will see that the producer "hangs" and does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. -Copy and paste the following into that file: +How about null values in Protobuf schema fields? See: `handling-null-values/kafka/code/src/main/proto/purchase.proto`: ``` syntax = "proto3"; @@ -184,7 +193,7 @@ Look at `ProtoProducerApp.java`, lines 76-77: We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `./gradlew runProtoProducer` and `./gradlew runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. -Now, if you _explicitly_ set the value of the item to null: +Now, if you _explicitly_ set the value of the item to null like so: ```java From 2530bef9e2e54aed3b882ac83c21644a653fc0d2 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 09:14:54 -0700 Subject: [PATCH 06/16] add intro --- .../kafka/README.md | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index e230aa46..d72ad636 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -1,8 +1,19 @@ # How to allow `null` field values in Avro and Protobuf -In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. +Let's say you're using an Avro or Protobuf schema, and sometimes you want to set a field named `item` to null. Say it's a pipeline that takes both donations and purchases to be filtered later, and the donations are processed as purchases with null items. How to adjust the schema to allow for a null value? + +Avro natively supports null fields with the 'null' type. In the above example, in order to make the `item` field nullable, you can allow the type to be "string" or "null" in the following manner: + +``` +{"name": "item", "type": ["string", "null"] } +``` -Let's say you're using an avro schema, and sometimes you want to set the `item` to null. How to adjust the schema to allow a null value? Let's walk through some pertinent bits of the code before running it. +In Protobuf, null values that occur due to the item not being set are handled automatically. But if you want to explicitly set the item to null, you'd have to use a [wrapper](https://tomasbasham.dev/development/2017/09/13/protocol-buffers-and-optional-values.html). + + +Let's walk through some pertinent bits of the code before running it. + +In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. In the `AvroProducer.java` file, there's a Kafka producer to send purchase events to a Kafka topic: From f0cd67e945830468a05ec7c977132cb8978a7a71 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 11:49:34 -0700 Subject: [PATCH 07/16] remove dupe configs and move test properties file --- .../main/java/io/confluent/developer/AvroConsumerApp.java | 6 ------ .../main/java/io/confluent/developer/AvroProducerApp.java | 5 ----- .../main/java/io/confluent/developer/ProtoConsumerApp.java | 7 ------- .../main/java/io/confluent/developer/ProtoProducerApp.java | 6 ------ ...sumeAppTest.java => ProtobufProduceConsumeAppTest.java} | 0 .../java/io/confluent/developer}/resources/test.properties | 0 6 files changed, 24 deletions(-) rename handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/{ProtoProduceConsumeAppTest.java => ProtobufProduceConsumeAppTest.java} (100%) rename handling-null-values-in-avro-and-protobuf/kafka/src/{main => test/java/io/confluent/developer}/resources/test.properties (100%) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java index e5c884a8..77157ae1 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java @@ -43,12 +43,6 @@ public ConsumerRecords consumePurchaseEvents() { avroConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); avroConsumerConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); - - // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry - avroConsumerConfigs.put("schema.registry.url", "SR_URL"); - avroConsumerConfigs.put("basic.auth.credentials.source", "USER_INFO"); - avroConsumerConfigs.put("basic.auth.user.info", "KEY:SECRET"); - Consumer avroConsumer = new KafkaConsumer<>(avroConsumerConfigs); avroConsumer.subscribe(Collections.singletonList("avro-purchase")); diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java index 890eb254..5cb1b0c5 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java @@ -42,11 +42,6 @@ public List producePurchaseEvents() { // Setting schema auto-registration to false since we already registered the schema manually following best practice avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); - // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry - avroProducerConfigs.put("schema.registry.url", "SR_URL"); - avroProducerConfigs.put("basic.auth.credentials.source", "USER_INFO"); - avroProducerConfigs.put("basic.auth.user.info", "KEY:SECRET"); - System.out.printf("Producer now configured for using SchemaRegistry %n"); List avroPurchaseEvents = new ArrayList<>(); diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java index 67c74bb5..837c5db0 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java @@ -37,13 +37,6 @@ public ConsumerRecords consumePurchaseEvents() { protoConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); protoConsumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class); - // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry - protoConsumerConfigs.put("schema.registry.url", "SR_URL"); - protoConsumerConfigs.put("basic.auth.credentials.source", "USER_INFO"); - protoConsumerConfigs.put("basic.auth.user.info", "KEY:SECRET"); - - - Consumer protoConsumer = new KafkaConsumer<>(protoConsumerConfigs); protoConsumer.subscribe(Collections.singletonList("proto-purchase")); diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java index 1068d253..6fd8f86c 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java @@ -39,12 +39,6 @@ public List producePurchaseEvents() { // Setting schema auto-registration to false since we already registered the schema manually following best practice protoProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); - // Duplication of configs loaded from confluent.properties to emphasize what's needed to use SchemaRegistry - protoProducerConfigs.put("schema.registry.url", "SR_URL"); - protoProducerConfigs.put("basic.auth.credentials.source", "USER_INFO"); - protoProducerConfigs.put("basic.auth.user.info", "KEY:SECRET"); - - System.out.printf("Producer now configured for using SchemaRegistry %n"); List protoPurchaseEvents = new ArrayList<>(); diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java similarity index 100% rename from handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtoProduceConsumeAppTest.java rename to handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/resources/test.properties similarity index 100% rename from handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/test.properties rename to handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/resources/test.properties From 745341ef12bdb22dc1ab3a282be49ecd90c8834b Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 12:21:13 -0700 Subject: [PATCH 08/16] update consumer group id --- .../java/io/confluent/developer/AvroConsumerApp.java | 12 +++--------- .../io/confluent/developer/ProtoConsumerApp.java | 4 ++-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java index 77157ae1..8c6a83e0 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java @@ -36,9 +36,8 @@ public ConsumerRecords consumePurchaseEvents() { properties.forEach((key, value) -> avroConsumerConfigs.put((String) key, value)); - avroConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); + avroConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "null-value-consumer"); avroConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - avroConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); avroConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); avroConsumerConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); @@ -48,18 +47,17 @@ public ConsumerRecords consumePurchaseEvents() { avroConsumer.subscribe(Collections.singletonList("avro-purchase")); ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + avroConsumerRecords.forEach(avroConsumerRecord -> { + System.out.printf("TEST"); PurchaseAvro avroPurchase = avroConsumerRecord.value(); System.out.print("Purchase details consumed from topic with Avro schema { "); System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); System.out.printf("Item: %s } %n", avroPurchase.getItem()); - }); return avroConsumerRecords; } - - Properties loadProperties () { try (InputStream inputStream = this.getClass() .getClassLoader() @@ -71,13 +69,9 @@ Properties loadProperties () { throw new RuntimeException(exception); } } - - public static void main (String[]args){ io.confluent.developer.AvroConsumerApp consumerApp = new io.confluent.developer.AvroConsumerApp(); consumerApp.consumePurchaseEvents(); } - - } \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java index 837c5db0..5db69519 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java @@ -30,9 +30,8 @@ public ConsumerRecords consumePurchaseEvents() { Map protoConsumerConfigs = new HashMap<>(); properties.forEach((key, value) -> protoConsumerConfigs.put((String) key, value)); - protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); + protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "null-value-consumer"); protoConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - protoConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); protoConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); protoConsumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class); @@ -41,6 +40,7 @@ public ConsumerRecords consumePurchaseEvents() { protoConsumer.subscribe(Collections.singletonList("proto-purchase")); ConsumerRecords protoConsumerRecords = protoConsumer.poll(Duration.ofSeconds(2)); + protoConsumerRecords.forEach(protoConsumerRecord -> { Purchase protoPurchase = protoConsumerRecord.value(); System.out.print("Purchase details consumed from topic with Protobuf schema { "); From fb6fd8b0c2232be3073c5c8a9d7e5580f02a8f49 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Mon, 26 Feb 2024 12:25:07 -0700 Subject: [PATCH 09/16] add to README and remove non-necessary properties --- .../kafka/README.md | 26 +++++++++++++++---- .../main/resources/confluent.properties.orig | 4 --- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index d72ad636..d27d2de8 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -85,7 +85,7 @@ You can run this example either with Confluent Cloud or by running the unit test #### Run the test ``` -./gradlew test +gradle test ```
Confluent Cloud @@ -161,7 +161,7 @@ Inside `handling-null-values/kafka/code/src/main/avro/purchase.avsc` you'll see: } ``` -When you run `./gradlew runAvroProducer` and furthermore, `./gradlew runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. +When you run `gradle runAvroProducer` and furthermore, `gradle runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. Now remove the `["string", "null"]` in the first field and replace it with `"string"`: @@ -178,7 +178,7 @@ Now remove the `["string", "null"]` in the first field and replace it with `"str } ``` -Now, if you run the code using `./gradlew runAvroProducer`, you will see that the producer "hangs" and does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. +Now, if you run the code using `gradle runAvroProducer`, you will see that the producer does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. How about null values in Protobuf schema fields? See: `handling-null-values/kafka/code/src/main/proto/purchase.proto`: @@ -202,7 +202,23 @@ Look at `ProtoProducerApp.java`, lines 76-77: .setTotalCost(random.nextDouble() * random.nextInt(100)); ``` -We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `./gradlew runProtoProducer` and `./gradlew runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. +We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `gradle runProtoProducer` and `gradle runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. + +The message will look something like this in Confluent Cloud: + +```json +{ + "totalCost": 41.20575583194131, + "customerId": "Customer Null" +} +``` + +and like this in the console: + +```json +{ Customer: Customer Null, Total Cost: 21.075714, Item: } + +``` Now, if you _explicitly_ set the value of the item to null like so: @@ -212,4 +228,4 @@ Now, if you _explicitly_ set the value of the item to null like so: .setTotalCost(random.nextDouble() * random.nextInt(100)); ``` -In this case, you'll receive a NullPointer error. \ No newline at end of file +In this case, you'll receive a NullPointer error. You can allow null values to be explicitly set with a [protocol wrapper type](https://protobuf.dev/reference/protobuf/google.protobuf/https://protobuf.dev/reference/protobuf/google.protobuf/). \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig index dc8f4f17..11a470b0 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig @@ -8,10 +8,6 @@ use.latest.version=true client.dns.lookup=use_all_dns_ips wrapper.for.nullables=true -key.converter=io.confluent.connect.avro.AvroConverter -key.converter.schema.registry.url=SR_URL/S -value.converter=io.confluent.connect.avro.AvroConverter -value.converter.schema.registry.url=CONVERTER_SR_URL/S # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms=45000 From fb64169691a4501d574237f6b58c2c3c065f1410 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Tue, 27 Feb 2024 09:53:39 -0700 Subject: [PATCH 10/16] make test runnable from top level --- .../kafka/README.md | 14 ++++++-------- settings.gradle | 1 + 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index d27d2de8..04619e1c 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -73,7 +73,8 @@ avroConsumer.subscribe(Collections.singletonList("avro-purchase")); ## Running the example -You can run this example either with Confluent Cloud or by running the unit test. +You can run this example either with Confluent Cloud or by running the unit test. Before getting started with either method, +clone `https://github.com/confluentinc/tutorials.git` and `cd` into `tutorials/handling-null-values-in-avro-and-protobuf`.
Kafka Streams-based test @@ -84,9 +85,12 @@ You can run this example either with Confluent Cloud or by running the unit test #### Run the test +From the top-level directory: + ``` -gradle test +./gradlew clean :handling-null-values-in-avro-and-protobuf:kafka:test --info ``` +
Confluent Cloud @@ -108,12 +112,6 @@ On the right-hand navbar, click 'API keys -> Add key -> Global access'. Download In the same navbar, click 'Clients -> Choose Your Language -> Java -> Create Schema Registry API key'. Save this key and secret as well as the URL listed in the configuration snippet. -Create a folder anywhere you like to house the code this tutorial: - -```bach -mkdir handling-null-values -``` - Now, create a file at `handling-null-values/resources/confluent.properties` with these values in it: ``` diff --git a/settings.gradle b/settings.gradle index 8026cf7a..71a658f8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -45,3 +45,4 @@ include 'tumbling-windows:kstreams' include 'udf:ksql' include 'versioned-ktables:kstreams' include 'window-final-result:kstreams' +include 'handling-null-values-in-avro-and-protobuf:kafka' From 1d930c5516faa92635b1292deb6e3a76085f7af4 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Tue, 27 Feb 2024 10:22:59 -0700 Subject: [PATCH 11/16] update gradle cmds --- handling-null-values-in-avro-and-protobuf/kafka/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index 04619e1c..d778a023 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -159,7 +159,7 @@ Inside `handling-null-values/kafka/code/src/main/avro/purchase.avsc` you'll see: } ``` -When you run `gradle runAvroProducer` and furthermore, `gradle runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. +When you run `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroProducer` and furthermore, `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. Now remove the `["string", "null"]` in the first field and replace it with `"string"`: @@ -176,7 +176,7 @@ Now remove the `["string", "null"]` in the first field and replace it with `"str } ``` -Now, if you run the code using `gradle runAvroProducer`, you will see that the producer does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. +Now, if you run the code using `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroProducer`, you will see that the producer does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. How about null values in Protobuf schema fields? See: `handling-null-values/kafka/code/src/main/proto/purchase.proto`: @@ -200,7 +200,7 @@ Look at `ProtoProducerApp.java`, lines 76-77: .setTotalCost(random.nextDouble() * random.nextInt(100)); ``` -We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `gradle runProtoProducer` and `gradle runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. +We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runProtoProducer` and `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. The message will look something like this in Confluent Cloud: From b2787043da9355530b9798f0aa4885c90527881d Mon Sep 17 00:00:00 2001 From: Cerchie Date: Tue, 27 Feb 2024 10:24:52 -0700 Subject: [PATCH 12/16] java tabs --- .../kafka/README.md | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index d778a023..b52fdeba 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -19,29 +19,29 @@ In this tutorial let's say we're tracking purchase events in Kafka with Confluen In the `AvroProducer.java` file, there's a Kafka producer to send purchase events to a Kafka topic: ```java -List avroPurchaseEvents = new ArrayList<>(); + List avroPurchaseEvents = new ArrayList<>(); - try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { - String avroTopic = "avro-purchase"; + try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { + String avroTopic = "avro-purchase"; - PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); - PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); - avroPurchaseEvents.add(avroPurchase); - avroPurchaseEvents.add(avroPurchaseII); + avroPurchaseEvents.add(avroPurchase); + avroPurchaseEvents.add(avroPurchaseII); - avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { - if (exception != null) { - System.err.printf("Producing %s resulted in error %s %n", event, exception); - } else { - System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); - } - }))); + avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); - } - return avroPurchaseEvents; - } + } + return avroPurchaseEvents; + } ``` In this file, we're setting the `item` in each event explicitly to `null`: @@ -57,17 +57,17 @@ In this file, we're setting the `item` in each event explicitly to `null`: In the `AvroConsumer.java` file, those events are consumed and printed to the console: ```java -avroConsumer.subscribe(Collections.singletonList("avro-purchase")); + avroConsumer.subscribe(Collections.singletonList("avro-purchase")); - ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); - avroConsumerRecords.forEach(avroConsumerRecord -> { - PurchaseAvro avroPurchase = avroConsumerRecord.value(); - System.out.print("Purchase details consumed from topic with Avro schema { "); - System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); - System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); - System.out.printf("Item: %s } %n", avroPurchase.getItem()); + ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + avroConsumerRecords.forEach(avroConsumerRecord -> { + PurchaseAvro avroPurchase = avroConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Avro schema { "); + System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", avroPurchase.getItem()); - }); + }); ``` From 025b2b90ea0f29f48deff4cb44525b0e7a939e7b Mon Sep 17 00:00:00 2001 From: Cerchie Date: Tue, 27 Feb 2024 11:12:21 -0700 Subject: [PATCH 13/16] clean up autogen'd purchase classes --- .../java/io/confluent/developer/ProtoConsumerApp.java | 10 ++++++---- .../java/io/confluent/developer/ProtoProducerApp.java | 9 ++++----- .../kafka/src/main/proto/purchase.proto | 1 + 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java index 5db69519..1e314e2b 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java @@ -1,6 +1,6 @@ package io.confluent.developer; -import io.confluent.developer.proto.PurchaseProto.Purchase; +import io.confluent.developer.proto.Purchase; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import org.apache.kafka.clients.consumer.Consumer; @@ -30,17 +30,19 @@ public ConsumerRecords consumePurchaseEvents() { Map protoConsumerConfigs = new HashMap<>(); properties.forEach((key, value) -> protoConsumerConfigs.put((String) key, value)); - protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "null-value-consumer"); + protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); protoConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + protoConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); protoConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); protoConsumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class); - Consumer protoConsumer = new KafkaConsumer<>(protoConsumerConfigs); + + + Consumer protoConsumer = new KafkaConsumer<>(protoConsumerConfigs); protoConsumer.subscribe(Collections.singletonList("proto-purchase")); ConsumerRecords protoConsumerRecords = protoConsumer.poll(Duration.ofSeconds(2)); - protoConsumerRecords.forEach(protoConsumerRecord -> { Purchase protoPurchase = protoConsumerRecord.value(); System.out.print("Purchase details consumed from topic with Protobuf schema { "); diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java index 6fd8f86c..4adc012d 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java @@ -1,9 +1,8 @@ package io.confluent.developer; -import io.confluent.developer.proto.PurchaseProto; -import io.confluent.developer.proto.PurchaseProto.Purchase; -import io.confluent.developer.proto.PurchaseProto.Purchase.Builder; +import io.confluent.developer.proto.Purchase; +import io.confluent.developer.proto.Purchase.Builder; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -26,7 +25,7 @@ public class ProtoProducerApp { private static final Logger LOG = LoggerFactory.getLogger(ProtoProducerApp.class); private final Random random = new Random(); - public List producePurchaseEvents() { + public List producePurchaseEvents() { Builder purchaseBuilder = Purchase.newBuilder(); Properties properties = loadProperties(); @@ -88,4 +87,4 @@ public static void main(String[] args) { ProtoProducerApp producerApp = new ProtoProducerApp(); producerApp.producePurchaseEvents(); } -} \ No newline at end of file +} diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto index 5d1bd5a4..e258cba0 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package io.confluent.developer.proto; option java_outer_classname = "PurchaseProto"; +option java_multiple_files = true; message Purchase { string item = 1; From eb81811fa76f5dc7c340cea357676e197673e741 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Thu, 29 Feb 2024 14:19:20 -0700 Subject: [PATCH 14/16] updating classnames in test --- .../ProtobufProduceConsumeAppTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java index 7f62f4a8..7c49208b 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java @@ -1,7 +1,7 @@ package io.confluent.developer; -import io.confluent.developer.proto.PurchaseProto; +import io.confluent.developer.proto.Purchase; import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -41,7 +41,8 @@ public class ProtobufProduceConsumeAppTest { @BeforeClass public static void beforeAllTests() throws IOException { - try (FileInputStream fis = new FileInputStream("../main/resources/test.properties")) { + + try (FileInputStream fis = new FileInputStream("resources/test.properties")) { properties.load(fis); properties.forEach((key, value) -> commonConfigs.put((String) key, value)); } @@ -64,9 +65,9 @@ public void testProduceProtoMultipleEvents() { MockProducer mockProtoProducer = new MockProducer(true, stringSerializer, (Serializer) protoSerializer); - List actualKeyValues = protoProducerApp.producePurchaseEvents(); + List actualKeyValues = protoProducerApp.producePurchaseEvents(); - List returnedAvroResults = protoProducerApp.producePurchaseEvents(); + List returnedAvroResults = protoProducerApp.producePurchaseEvents(); List> expectedKeyValues = mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); @@ -82,19 +83,19 @@ public void testProduceProtoMultipleEvents() { @Test public void testConsumeProtoEvents() { - MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); String topic = (String) commonConfigs.get("proto.topic"); mockConsumer.schedulePollTask(() -> { addTopicPartitionsAssignment(topic, mockConsumer); - addConsumerRecords(mockConsumer, protoProducerApp.producePurchaseEvents(), PurchaseProto.Purchase::getCustomerId, topic); + addConsumerRecords(mockConsumer, protoProducerApp.producePurchaseEvents(), Purchase::getCustomerId, topic); }); - ConsumerRecords returnedProtoResults = protoConsumerApp.consumePurchaseEvents(); - List actualProtoResults = new ArrayList<>(); + ConsumerRecords returnedProtoResults = protoConsumerApp.consumePurchaseEvents(); + List actualProtoResults = new ArrayList<>(); returnedProtoResults.forEach(c -> { - PurchaseProto.Purchase purchaseProto = c.value(); + Purchase purchaseProto = c.value(); assertEquals("Customer Null", purchaseProto.getCustomerId()); assertEquals(null, purchaseProto.getItem()); assertEquals(actualProtoResults.size(), 2); From 61229e56040eed3dfcbba6137d79d9ffcf9e08a0 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Thu, 29 Feb 2024 14:26:28 -0700 Subject: [PATCH 15/16] update path --- .../io/confluent/developer/ProtobufProduceConsumeAppTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java index 7c49208b..79bdea8e 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java @@ -41,8 +41,7 @@ public class ProtobufProduceConsumeAppTest { @BeforeClass public static void beforeAllTests() throws IOException { - - try (FileInputStream fis = new FileInputStream("resources/test.properties")) { + try (FileInputStream fis = new FileInputStream("src/test/java/io/confluent/developer/resources/test.properties")) { properties.load(fis); properties.forEach((key, value) -> commonConfigs.put((String) key, value)); } From 34c5c56258591b5e6acd9b7605c6b23241b95c21 Mon Sep 17 00:00:00 2001 From: Cerchie Date: Thu, 29 Feb 2024 14:42:22 -0700 Subject: [PATCH 16/16] test password --- handling-null-values-in-avro-and-protobuf/kafka/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md index b52fdeba..9375e182 100644 --- a/handling-null-values-in-avro-and-protobuf/kafka/README.md +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -1,5 +1,4 @@ # How to allow `null` field values in Avro and Protobuf - Let's say you're using an Avro or Protobuf schema, and sometimes you want to set a field named `item` to null. Say it's a pipeline that takes both donations and purchases to be filtered later, and the donations are processed as purchases with null items. How to adjust the schema to allow for a null value? Avro natively supports null fields with the 'null' type. In the above example, in order to make the `item` field nullable, you can allow the type to be "string" or "null" in the following manner: