diff --git a/handling-null-values-in-avro-and-protobuf/kafka/README.md b/handling-null-values-in-avro-and-protobuf/kafka/README.md new file mode 100644 index 00000000..9375e182 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/README.md @@ -0,0 +1,228 @@ +# How to allow `null` field values in Avro and Protobuf +Let's say you're using an Avro or Protobuf schema, and sometimes you want to set a field named `item` to null. Say it's a pipeline that takes both donations and purchases to be filtered later, and the donations are processed as purchases with null items. How to adjust the schema to allow for a null value? + +Avro natively supports null fields with the 'null' type. In the above example, in order to make the `item` field nullable, you can allow the type to be "string" or "null" in the following manner: + +``` +{"name": "item", "type": ["string", "null"] } +``` + +In Protobuf, null values that occur due to the item not being set are handled automatically. But if you want to explicitly set the item to null, you'd have to use a [wrapper](https://tomasbasham.dev/development/2017/09/13/protocol-buffers-and-optional-values.html). + + +Let's walk through some pertinent bits of the code before running it. + +In this tutorial let's say we're tracking purchase events in Kafka with Confluent Cloud, each with an `item`, a `total_cost`, and a `customer_id`. + + +In the `AvroProducer.java` file, there's a Kafka producer to send purchase events to a Kafka topic: + +```java + List avroPurchaseEvents = new ArrayList<>(); + + try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { + String avroTopic = "avro-purchase"; + + PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); + + avroPurchaseEvents.add(avroPurchase); + avroPurchaseEvents.add(avroPurchaseII); + + avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + + } + return avroPurchaseEvents; + } +``` + +In this file, we're setting the `item` in each event explicitly to `null`: + +```java + PurchaseAvro getPurchaseObjectAvro(PurchaseAvro.Builder purchaseAvroBuilder) { + purchaseAvroBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseAvroBuilder.build(); + } +``` + +In the `AvroConsumer.java` file, those events are consumed and printed to the console: + +```java + avroConsumer.subscribe(Collections.singletonList("avro-purchase")); + + ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + avroConsumerRecords.forEach(avroConsumerRecord -> { + PurchaseAvro avroPurchase = avroConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Avro schema { "); + System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", avroPurchase.getItem()); + + }); + +``` + +## Running the example + +You can run this example either with Confluent Cloud or by running the unit test. Before getting started with either method, +clone `https://github.com/confluentinc/tutorials.git` and `cd` into `tutorials/handling-null-values-in-avro-and-protobuf`. + +
+ Kafka Streams-based test + +#### Prerequisites + +* Java 17, e.g., follow the OpenJDK installation instructions [here](https://openjdk.org/install/) if you don't have Java. + +#### Run the test + +From the top-level directory: + +``` +./gradlew clean :handling-null-values-in-avro-and-protobuf:kafka:test --info +``` + +
+ Confluent Cloud + +#### Prerequisites + + * A [Confluent Cloud](https://confluent.cloud/signup) account + +#### Run the commands + +[Sign up](https://www.confluent.io/) for a Confluent Cloud account if you haven't already. + +Login, and then click 'Environments -> Create Cloud Environment' and create a cloud environment using the defaults there. + +Navigate to your environment and click 'Add cluster'. Create a cluster using the default values provided. + +Click 'Topics -> Add topic' to create two topics with the default values, one named 'avro-purchase' and the other 'proto-purchase' (we'll cover null values in Protobuf schemas later in the tutorial). + +On the right-hand navbar, click 'API keys -> Add key -> Global access'. Download the values as you will need them to run this tutorial. + +In the same navbar, click 'Clients -> Choose Your Language -> Java -> Create Schema Registry API key'. Save this key and secret as well as the URL listed in the configuration snippet. + +Now, create a file at `handling-null-values/resources/confluent.properties` with these values in it: + +``` +# Required connection configs for Kafka producer, consumer, and admin +bootstrap.servers=BOOTSTRAP_URL/S +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; +sasl.mechanism=PLAIN +use.latest.version=true +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +wrapper.for.nullables=true +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=SR_URL/S +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=CONVERTER_SR_URL/S + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url=SR_URL/S +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=API_KEY:SECRET +``` + +Replace the USERNAME and PASSWORD values with the Confluent Cloud key and secret respectively. Add the url from the schema registry client configuration snippet for `SR_URL/S` and add the schema registry API key and secret for `basic.auth.user.info`, retaining the colon in the placeholder. + +Inside `handling-null-values/kafka/code/src/main/avro/purchase.avsc` you'll see: + +``` +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": ["string", "null"] }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} +``` + +When you run `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroProducer` and furthermore, `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroConsumer`, you'll see that the events with null items are produced and consumed successfully. + +Now remove the `["string", "null"]` in the first field and replace it with `"string"`: + +``` +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": "string" }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} +``` + +Now, if you run the code using `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runAvroProducer`, you will see that the producer does not produce events. If Avro schemas are to accept null values they need it set explicitly on the field. + +How about null values in Protobuf schema fields? See: `handling-null-values/kafka/code/src/main/proto/purchase.proto`: + +``` +syntax = "proto3"; + +package io.confluent.developer.proto; +option java_outer_classname = "PurchaseProto"; + +message Purchase { + string item = 1; + double total_cost = 2; + string customer_id = 3; +} +``` + +Look at `ProtoProducerApp.java`, lines 76-77: + +```java + purchaseBuilder.setCustomerId("Customer Null") + .setTotalCost(random.nextDouble() * random.nextInt(100)); +``` + +We can see that the developer who wrote this app 'forgot' to write the `setItem()` method that adds an item. This means that the value will be null. But when you run you run `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runProtoProducer` and `./gradlew :handling-null-values-in-avro-and-protobuf:kafka:runProtoConsumer` no errors will arise. That's because Protobuf automatically handles default values. + +The message will look something like this in Confluent Cloud: + +```json +{ + "totalCost": 41.20575583194131, + "customerId": "Customer Null" +} +``` + +and like this in the console: + +```json +{ Customer: Customer Null, Total Cost: 21.075714, Item: } + +``` + +Now, if you _explicitly_ set the value of the item to null like so: + + +```java + purchaseBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); +``` + +In this case, you'll receive a NullPointer error. You can allow null values to be explicitly set with a [protocol wrapper type](https://protobuf.dev/reference/protobuf/google.protobuf/https://protobuf.dev/reference/protobuf/google.protobuf/). \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/build.gradle b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle new file mode 100644 index 00000000..77745615 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/build.gradle @@ -0,0 +1,148 @@ +buildscript { + repositories { + mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven/") + } + maven { + url = uri("https://plugins.gradle.org/m2/") + } + maven { + url = uri("https://jitpack.io") + } + } +} + +plugins { + id 'java' + id 'idea' + id 'eclipse' + id "com.google.protobuf" version "0.9.4" + id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.13.0" + id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0" +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +repositories { + mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven/") + } + + maven { + url = uri("https://jitpack.io") + } +} + +dependencies { + implementation 'com.google.protobuf:protobuf-java:3.21.1' + implementation 'org.apache.avro:avro:1.11.0' + implementation "io.confluent:kafka-avro-serializer:7.5.1" + implementation "io.confluent:kafka-protobuf-serializer:7.5.1" + implementation "io.confluent:kafka-protobuf-provider:7.5.1" + + implementation 'org.apache.logging.log4j:log4j-api:2.18.0' + implementation 'org.apache.logging.log4j:log4j-core:2.18.0' + implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0' + + implementation('org.apache.kafka:kafka-streams:3.6.1') { + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } + implementation('org.apache.kafka:kafka-clients:3.6.1') + testImplementation 'junit:junit:4.13.1' + +} + +protobuf { + generatedFilesBaseDir = "${project.buildDir}/generated-main-proto-java" + + protoc { + artifact = 'com.google.protobuf:protoc:3.18.2' + } +} + +// See https://github.com/ImFlog/schema-registry-plugin for more details on configuring the Schema Registry plugin +schemaRegistry { + def props = new Properties() + file("src/main/resources/confluent.properties").withInputStream { props.load(it) } + def srUrl = props.getProperty("schema.registry.url") + def fullAuth = props.getProperty("basic.auth.user.info") + + config { + subject('avro-purchase-value', 'FORWARD') + subject('proto-purchase-value', 'FORWARD') + } + + if (srUrl != null && fullAuth != null) { + // println "Using Schema Registry endpoint:${srUrl}, username:${auth[0]},password:${auth[1]}" + def auth = fullAuth.split(":") + url = srUrl + + credentials { + // username is the characters up to the ':' in the basic.auth.user.info property + username = auth[0] + // password is everything after ':' in the basic.auth.user.info property + password = auth[1] + } + } else { + println("Expected to find the [schema.registry.url] and [basic.auth.user.info]") + } + + register { + subject('avro-purchase-value', 'src/main/avro/purchase.avsc', 'AVRO') + subject('proto-purchase-value', 'src/main/proto/purchase.proto', 'PROTOBUF') + } + + download { + // commented out to prevent its download which results in the schema + // definition json being flattened to a single line which doesn't + // match the exercise illustration + // subject('avro-purchase-value', 'src/main/avro', 'purchase') + subject('proto-purchase-value', 'src/main/proto', 'purchase') + } + + compatibility { + subject('avro-purchase-value', 'src/main/avro/purchase.avsc', 'AVRO') + subject('proto-purchase-value', 'src/main/proto/purchase.proto', 'PROTOBUF') + } + + +} + + +task runProtoProducer(type: Exec) { + var clientFullPath = 'io.confluent.developer.ProtoProducerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Protobuf Producer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + +task runAvroProducer(type: Exec) { + var clientFullPath = 'io.confluent.developer.AvroProducerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Avro Producer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + + +task runAvroConsumer(type: Exec) { + var clientFullPath = 'io.confluent.developer.AvroConsumerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Avro Consumer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} + +task runProtoConsumer(type: Exec) { + var clientFullPath = 'io.confluent.developer.ProtoConsumerApp' + dependsOn assemble + group = "Execution" + description = "This task executes the Protobuf Consumer for the exercise about handling null values" + commandLine "java", "-classpath", sourceSets.main.runtimeClasspath.getAsPath(), clientFullPath +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle b/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle new file mode 100644 index 00000000..05ab1ec3 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/7.0/userguide/multi_project_builds.html + */ + +rootProject.name = 'handling-null-values-in-avro-and-protobuf' \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc b/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc new file mode 100644 index 00000000..4c6188b9 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/avro/purchase.avsc @@ -0,0 +1,10 @@ +{ + "type":"record", + "namespace": "io.confluent.developer.avro", + "name":"PurchaseAvro", + "fields": [ + {"name": "item", "type": ["string", "null"] }, + {"name": "total_cost", "type": "double" }, + {"name": "customer_id", "type": "string"} + ] +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java new file mode 100644 index 00000000..8c6a83e0 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroConsumerApp.java @@ -0,0 +1,77 @@ +package io.confluent.developer; + +import io.confluent.developer.avro.PurchaseAvro; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.Properties; + +public class AvroConsumerApp implements AutoCloseable{ + + public void close() { + keepConsumingAvro = false; + ExecutorService executorService = null; + executorService.shutdown(); + } + private volatile boolean keepConsumingAvro = true; + + public ConsumerRecords consumePurchaseEvents() { + Properties properties = loadProperties(); + + Map avroConsumerConfigs = new HashMap<>(); + + + properties.forEach((key, value) -> avroConsumerConfigs.put((String) key, value)); + avroConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "null-value-consumer"); + avroConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + avroConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + avroConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); + avroConsumerConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); + + Consumer avroConsumer = new KafkaConsumer<>(avroConsumerConfigs); + + avroConsumer.subscribe(Collections.singletonList("avro-purchase")); + + ConsumerRecords avroConsumerRecords = avroConsumer.poll(Duration.ofSeconds(2)); + + avroConsumerRecords.forEach(avroConsumerRecord -> { + System.out.printf("TEST"); + PurchaseAvro avroPurchase = avroConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Avro schema { "); + System.out.printf("Customer: %s, ", avroPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", avroPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", avroPurchase.getItem()); + }); + return avroConsumerRecords; + } + Properties loadProperties () { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + public static void main (String[]args){ + io.confluent.developer.AvroConsumerApp consumerApp = new io.confluent.developer.AvroConsumerApp(); + consumerApp.consumePurchaseEvents(); + } + +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java new file mode 100644 index 00000000..5cb1b0c5 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/AvroProducerApp.java @@ -0,0 +1,96 @@ +package io.confluent.developer; + +import io.confluent.developer.avro.PurchaseAvro; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class AvroProducerApp { + + private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApp.class); + private final Random random = new Random(); + private final List items = List.of("shoes", "sun-glasses", "t-shirt"); + + public List producePurchaseEvents() { + PurchaseAvro.Builder purchaseBuilder = PurchaseAvro.newBuilder(); + Properties properties = loadProperties(); + + Map avroProducerConfigs = new HashMap<>(); + + + properties.forEach((key, value) -> avroProducerConfigs.put((String) key, value)); + + avroProducerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + avroProducerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true); + // Setting schema auto-registration to false since we already registered the schema manually following best practice + avroProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + + System.out.printf("Producer now configured for using SchemaRegistry %n"); + + List avroPurchaseEvents = new ArrayList<>(); + try (final Producer producer = new KafkaProducer<>(avroProducerConfigs)) { + String avroTopic = "avro-purchase"; + + PurchaseAvro avroPurchase = getPurchaseObjectAvro(purchaseBuilder); + PurchaseAvro avroPurchaseII = getPurchaseObjectAvro(purchaseBuilder); + + avroPurchaseEvents.add(avroPurchase); + avroPurchaseEvents.add(avroPurchaseII); + + avroPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(avroTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Avro schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + + } + return avroPurchaseEvents; + } + + + + PurchaseAvro getPurchaseObjectAvro(PurchaseAvro.Builder purchaseAvroBuilder) { + purchaseAvroBuilder.setCustomerId("Customer Null").setItem(null) + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseAvroBuilder.build(); + } + + Properties loadProperties() { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main(String[] args) { + AvroProducerApp producerApp = new AvroProducerApp(); + producerApp.producePurchaseEvents(); + } + } + + diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java new file mode 100644 index 00000000..1e314e2b --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoConsumerApp.java @@ -0,0 +1,73 @@ +package io.confluent.developer; + +import io.confluent.developer.proto.Purchase; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +public class ProtoConsumerApp { + + public void close() { + ExecutorService executorService = null; + executorService.shutdown(); + } + + public ConsumerRecords consumePurchaseEvents() { + Properties properties = loadProperties(); + Map protoConsumerConfigs = new HashMap<>(); + + properties.forEach((key, value) -> protoConsumerConfigs.put((String) key, value)); + protoConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "schema-registry-course-consumer"); + protoConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + protoConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + protoConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); + protoConsumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class); + + + + Consumer protoConsumer = new KafkaConsumer<>(protoConsumerConfigs); + protoConsumer.subscribe(Collections.singletonList("proto-purchase")); + + ConsumerRecords protoConsumerRecords = protoConsumer.poll(Duration.ofSeconds(2)); + protoConsumerRecords.forEach(protoConsumerRecord -> { + Purchase protoPurchase = protoConsumerRecord.value(); + System.out.print("Purchase details consumed from topic with Protobuf schema { "); + System.out.printf("Customer: %s, ", protoPurchase.getCustomerId()); + System.out.printf("Total Cost: %f, ", protoPurchase.getTotalCost()); + System.out.printf("Item: %s } %n", protoPurchase.getItem()); + }); + return protoConsumerRecords; + } + + + Properties loadProperties () { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main (String[]args){ + ProtoConsumerApp consumerApp = new ProtoConsumerApp(); + consumerApp.consumePurchaseEvents(); + } + } diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java new file mode 100644 index 00000000..4adc012d --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/java/io/confluent/developer/ProtoProducerApp.java @@ -0,0 +1,90 @@ +package io.confluent.developer; + + +import io.confluent.developer.proto.Purchase; +import io.confluent.developer.proto.Purchase.Builder; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class ProtoProducerApp { + private static final Logger LOG = LoggerFactory.getLogger(ProtoProducerApp.class); + private final Random random = new Random(); + public List producePurchaseEvents() { + Builder purchaseBuilder = Purchase.newBuilder(); + Properties properties = loadProperties(); + + Map protoProducerConfigs = new HashMap<>(); + + properties.forEach((key, value) -> protoProducerConfigs.put((String) key, value)); + + protoProducerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + protoProducerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class); + // Setting schema auto-registration to false since we already registered the schema manually following best practice + protoProducerConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + + System.out.printf("Producer now configured for using SchemaRegistry %n"); + List protoPurchaseEvents = new ArrayList<>(); + + try (final Producer producer = new KafkaProducer<>(protoProducerConfigs)) { + String protoTopic = "proto-purchase"; + + Purchase protoPurchase = getPurchaseObjectProto(purchaseBuilder); + Purchase protoPurchaseII = getPurchaseObjectProto(purchaseBuilder); + + protoPurchaseEvents.add(protoPurchase); + protoPurchaseEvents.add(protoPurchaseII); + + protoPurchaseEvents.forEach(event -> producer.send(new ProducerRecord<>(protoTopic, event.getCustomerId(), event), ((metadata, exception) -> { + if (exception != null) { + System.err.printf("Producing %s resulted in error %s %n", event, exception); + } else { + System.out.printf("Produced record to topic with Protobuf schema at offset %s with timestamp %d %n", metadata.offset(), metadata.timestamp()); + } + }))); + + } + return protoPurchaseEvents; + } + + + + Purchase getPurchaseObjectProto(Builder purchaseBuilder) { + purchaseBuilder.clear(); + purchaseBuilder.setCustomerId("Customer Null") + .setTotalCost(random.nextDouble() * random.nextInt(100)); + return purchaseBuilder.build(); + } + + Properties loadProperties() { + try (InputStream inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("confluent.properties")) { + Properties props = new Properties(); + props.load(inputStream); + return props; + } catch (IOException exception) { + throw new RuntimeException(exception); + } + } + + public static void main(String[] args) { + ProtoProducerApp producerApp = new ProtoProducerApp(); + producerApp.producePurchaseEvents(); + } +} diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto new file mode 100644 index 00000000..e258cba0 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/proto/purchase.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package io.confluent.developer.proto; +option java_outer_classname = "PurchaseProto"; +option java_multiple_files = true; + +message Purchase { + string item = 1; + double total_cost = 2; + string customer_id = 3; +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig new file mode 100644 index 00000000..11a470b0 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/main/resources/confluent.properties.orig @@ -0,0 +1,21 @@ +# Required connection configs for Kafka producer, consumer, and admin +bootstrap.servers=BOOTSTRAP_URL/S +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD'; +sasl.mechanism=PLAIN +use.latest.version=true +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +wrapper.for.nullables=true + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url=SR_URL/S +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=API_KEY:SECRET \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java new file mode 100644 index 00000000..4db8514c --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/AvroProduceConsumeAppTest.java @@ -0,0 +1,125 @@ +package io.confluent.developer; + + +import io.confluent.developer.avro.PurchaseAvro; + +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + +public class AvroProduceConsumeAppTest { + private static final Map commonConfigs = new HashMap<>(); + private static final Properties properties = new Properties(); + private final Serializer stringSerializer = new StringSerializer(); + private AvroProducerApp avroProducerApp; + private AvroConsumerApp avroConsumerApp; + + @BeforeClass + public static void beforeAllTests() throws IOException { + try (FileInputStream fis = new FileInputStream("../main/resources/test.properties")) { + properties.load(fis); + properties.forEach((key, value) -> commonConfigs.put((String) key, value)); + } + } + + + @Before + public void setup() { + avroProducerApp = new AvroProducerApp(); + avroConsumerApp = new AvroConsumerApp(); + } + + @Test + @SuppressWarnings("unchecked") + public void testProduceAvroMultipleEvents() { + KafkaAvroSerializer avroSerializer + = new KafkaAvroSerializer(); + avroSerializer.configure(commonConfigs, false); + MockProducer mockAvroProducer + = new MockProducer(true, stringSerializer, (Serializer) avroSerializer); + + List returnedAvroResults = avroProducerApp.producePurchaseEvents(); + + returnedAvroResults.forEach(c -> + { + String purchaseAvroId = c.getCustomerId(); + String purchaseItem = c.getItem(); + assertEquals("Customer Null", purchaseAvroId); + assertEquals(null, purchaseItem); + assertEquals(returnedAvroResults.size(), 2); + }); + + } + + @Test + public void testConsumeAvroEvents() { + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + String topic = (String) commonConfigs.get("avro.topic"); + + mockConsumer.schedulePollTask(() -> { + addTopicPartitionsAssignment(topic, mockConsumer); + addConsumerRecords(mockConsumer, avroProducerApp.producePurchaseEvents(), PurchaseAvro::getCustomerId, topic); + }); + + ConsumerRecords returnedAvroResults = avroConsumerApp.consumePurchaseEvents(); + List actualAvroResults = new ArrayList<>(); + returnedAvroResults.forEach(c -> + { + PurchaseAvro purchaseAvro = c.value(); + assertEquals("Customer Null", purchaseAvro.getCustomerId()); + assertEquals(null,purchaseAvro.getItem()); + assertEquals(actualAvroResults.size(), 2); + }); + + mockConsumer.schedulePollTask(() -> avroConsumerApp.close()); + } + + private KeyValue toKeyValue(final ProducerRecord producerRecord) { + return KeyValue.pair(producerRecord.key(), producerRecord.value()); + } + + private void addTopicPartitionsAssignment(final String topic, + final MockConsumer mockConsumer) { + final TopicPartition topicPartition = new TopicPartition(topic, 0); + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + mockConsumer.rebalance(Collections.singletonList(topicPartition)); + mockConsumer.updateBeginningOffsets(beginningOffsets); + } + + private void addConsumerRecords(final MockConsumer mockConsumer, + final List records, + final Function keyFunction, + final String topic) { + AtomicInteger offset = new AtomicInteger(0); + records.stream() + .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r)) + .forEach(mockConsumer::addRecord); + } +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java new file mode 100644 index 00000000..79bdea8e --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/ProtobufProduceConsumeAppTest.java @@ -0,0 +1,129 @@ +package io.confluent.developer; + + +import io.confluent.developer.proto.Purchase; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class ProtobufProduceConsumeAppTest { + private static final Map commonConfigs = new HashMap<>(); + private static final Properties properties = new Properties(); + private final Serializer stringSerializer = new StringSerializer(); + private ProtoProducerApp protoProducerApp; + private ProtoConsumerApp protoConsumerApp; + + @BeforeClass + public static void beforeAllTests() throws IOException { + try (FileInputStream fis = new FileInputStream("src/test/java/io/confluent/developer/resources/test.properties")) { + properties.load(fis); + properties.forEach((key, value) -> commonConfigs.put((String) key, value)); + } + } + + + @Before + public void setup() { + protoProducerApp = new ProtoProducerApp(); + protoConsumerApp = new ProtoConsumerApp(); + } + + + @Test + @SuppressWarnings("unchecked") + public void testProduceProtoMultipleEvents() { + KafkaProtobufSerializer protoSerializer + = new KafkaProtobufSerializer(); + protoSerializer.configure(commonConfigs, false); + MockProducer mockProtoProducer + = new MockProducer(true, stringSerializer, (Serializer) protoSerializer); + + List actualKeyValues = protoProducerApp.producePurchaseEvents(); + + List returnedAvroResults = protoProducerApp.producePurchaseEvents(); + + List> expectedKeyValues = + mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); + + returnedAvroResults.forEach(c -> + { + String purchaseProtoId = c.getCustomerId(); + assertEquals("Customer Null", purchaseProtoId); + assertEquals(actualKeyValues.size(), 2); + }); + + } + + @Test + public void testConsumeProtoEvents() { + MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + String topic = (String) commonConfigs.get("proto.topic"); + + mockConsumer.schedulePollTask(() -> { + addTopicPartitionsAssignment(topic, mockConsumer); + addConsumerRecords(mockConsumer, protoProducerApp.producePurchaseEvents(), Purchase::getCustomerId, topic); + }); + + ConsumerRecords returnedProtoResults = protoConsumerApp.consumePurchaseEvents(); + List actualProtoResults = new ArrayList<>(); + returnedProtoResults.forEach(c -> + { + Purchase purchaseProto = c.value(); + assertEquals("Customer Null", purchaseProto.getCustomerId()); + assertEquals(null, purchaseProto.getItem()); + assertEquals(actualProtoResults.size(), 2); + }); + + mockConsumer.schedulePollTask(() -> protoConsumerApp.close()); + + } + + private KeyValue toKeyValue(final ProducerRecord producerRecord) { + return KeyValue.pair(producerRecord.key(), producerRecord.value()); + } + + private void addTopicPartitionsAssignment(final String topic, + final MockConsumer mockConsumer) { + final TopicPartition topicPartition = new TopicPartition(topic, 0); + final Map beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + mockConsumer.rebalance(Collections.singletonList(topicPartition)); + mockConsumer.updateBeginningOffsets(beginningOffsets); + } + + private void addConsumerRecords(final MockConsumer mockConsumer, + final List records, + final Function keyFunction, + final String topic) { + AtomicInteger offset = new AtomicInteger(0); + records.stream() + .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r)) + .forEach(mockConsumer::addRecord); + } +} \ No newline at end of file diff --git a/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/resources/test.properties b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/resources/test.properties new file mode 100644 index 00000000..e49c0168 --- /dev/null +++ b/handling-null-values-in-avro-and-protobuf/kafka/src/test/java/io/confluent/developer/resources/test.properties @@ -0,0 +1,3 @@ +proto.topic=proto-records +avro.topic=avro-records +schema.registry.url=mock://null-values-produce-consume-test \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 8026cf7a..71a658f8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -45,3 +45,4 @@ include 'tumbling-windows:kstreams' include 'udf:ksql' include 'versioned-ktables:kstreams' include 'window-final-result:kstreams' +include 'handling-null-values-in-avro-and-protobuf:kafka'