diff --git a/examples/powertools-examples-core-utilities/cdk/app/pom.xml b/examples/powertools-examples-core-utilities/cdk/app/pom.xml index 0c4dec217..87bea4b38 100644 --- a/examples/powertools-examples-core-utilities/cdk/app/pom.xml +++ b/examples/powertools-examples-core-utilities/cdk/app/pom.xml @@ -41,7 +41,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.apache.logging.log4j diff --git a/examples/powertools-examples-core-utilities/gradle/build.gradle b/examples/powertools-examples-core-utilities/gradle/build.gradle index 37d9f4554..b86e49659 100644 --- a/examples/powertools-examples-core-utilities/gradle/build.gradle +++ b/examples/powertools-examples-core-utilities/gradle/build.gradle @@ -26,7 +26,7 @@ dependencies { implementation 'com.amazonaws:aws-lambda-java-core:1.2.2' implementation 'com.fasterxml.jackson.core:jackson-annotations:2.13.2' implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.2' - implementation 'com.amazonaws:aws-lambda-java-events:3.11.0' + implementation 'com.amazonaws:aws-lambda-java-events:3.16.0' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2' implementation 'org.aspectj:aspectjrt:1.9.20.1' aspect 'software.amazon.lambda:powertools-tracing:2.1.0' diff --git a/examples/powertools-examples-core-utilities/kotlin/build.gradle.kts b/examples/powertools-examples-core-utilities/kotlin/build.gradle.kts index de820300d..23c84a0d3 100644 --- a/examples/powertools-examples-core-utilities/kotlin/build.gradle.kts +++ b/examples/powertools-examples-core-utilities/kotlin/build.gradle.kts @@ -12,7 +12,7 @@ dependencies { implementation("com.amazonaws:aws-lambda-java-core:1.2.3") implementation("com.fasterxml.jackson.core:jackson-annotations:2.15.1") implementation("com.fasterxml.jackson.core:jackson-databind:2.15.3") - implementation("com.amazonaws:aws-lambda-java-events:3.11.3") + implementation("com.amazonaws:aws-lambda-java-events:3.16.0") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2") implementation("org.aspectj:aspectjrt:1.9.20.1") aspect("software.amazon.lambda:powertools-tracing:2.1.0") @@ -23,4 +23,4 @@ dependencies { kotlin { jvmToolchain(11) -} \ No newline at end of file +} diff --git a/examples/powertools-examples-core-utilities/sam-graalvm/pom.xml b/examples/powertools-examples-core-utilities/sam-graalvm/pom.xml index ead4625f5..d691c02a5 100644 --- a/examples/powertools-examples-core-utilities/sam-graalvm/pom.xml +++ b/examples/powertools-examples-core-utilities/sam-graalvm/pom.xml @@ -39,7 +39,7 @@ com.amazonaws aws-lambda-java-events - 3.11.3 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-core-utilities/sam/pom.xml b/examples/powertools-examples-core-utilities/sam/pom.xml index 44f171698..f37319ea1 100644 --- a/examples/powertools-examples-core-utilities/sam/pom.xml +++ b/examples/powertools-examples-core-utilities/sam/pom.xml @@ -38,7 +38,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-core-utilities/serverless/pom.xml b/examples/powertools-examples-core-utilities/serverless/pom.xml index 42d70ba76..1f4e0a2fc 100644 --- a/examples/powertools-examples-core-utilities/serverless/pom.xml +++ b/examples/powertools-examples-core-utilities/serverless/pom.xml @@ -38,7 +38,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-core-utilities/terraform/pom.xml b/examples/powertools-examples-core-utilities/terraform/pom.xml index a9ab410e3..6dcd2f8c6 100644 --- a/examples/powertools-examples-core-utilities/terraform/pom.xml +++ b/examples/powertools-examples-core-utilities/terraform/pom.xml @@ -38,7 +38,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-idempotency/pom.xml b/examples/powertools-examples-idempotency/pom.xml index 06a50c16b..c7ca6a058 100644 --- a/examples/powertools-examples-idempotency/pom.xml +++ b/examples/powertools-examples-idempotency/pom.xml @@ -52,7 +52,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-kafka/events/kafka-protobuf-event.json b/examples/powertools-examples-kafka/events/kafka-protobuf-event.json index e0547ad88..6f5ec58ae 100644 --- a/examples/powertools-examples-kafka/events/kafka-protobuf-event.json +++ b/examples/powertools-examples-kafka/events/kafka-protobuf-event.json @@ -30,7 +30,11 @@ { "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] } - ] + ], + "valueSchemaMetadata": { + "schemaId": "123", + "dataFormat": "PROTOBUF" + } }, { "topic": "mytopic", @@ -39,12 +43,34 @@ "timestamp": 1545084650989, "timestampType": "CREATE_TIME", "key": null, - "value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=", + "value": "BAIACOkHEgZMYXB0b3AZUrgehes/j0A=", "headers": [ { "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] } - ] + ], + "valueSchemaMetadata": { + "schemaId": "456", + "dataFormat": "PROTOBUF" + } + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 18, + "timestamp": 1545084650990, + "timestampType": "CREATE_TIME", + "key": "NDI=", + "value": "AQjpBxIGTGFwdG9wGVK4HoXrP49A", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ], + "valueSchemaMetadata": { + "schemaId": "12345678-1234-1234-1234-123456789012", + "dataFormat": "PROTOBUF" + } } ] } diff --git a/examples/powertools-examples-kafka/tools/pom.xml b/examples/powertools-examples-kafka/tools/pom.xml index 97231e5bd..80ed6c264 100644 --- a/examples/powertools-examples-kafka/tools/pom.xml +++ b/examples/powertools-examples-kafka/tools/pom.xml @@ -13,6 +13,7 @@ 11 1.12.0 4.31.0 + 4.0.0 @@ -26,6 +27,11 @@ protobuf-java ${protobuf.version} + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + com.fasterxml.jackson.core jackson-databind diff --git a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java index eecd3e1cc..aa5f6e330 100644 --- a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java +++ b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java @@ -2,8 +2,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Base64; +import org.apache.kafka.common.utils.ByteUtils; import org.demo.kafka.protobuf.ProtobufProduct; import com.google.protobuf.CodedOutputStream; @@ -19,37 +21,42 @@ private GenerateProtobufSamples() { } public static void main(String[] args) throws IOException { - // Create a single product that will be used for all three scenarios + // Create a single product that will be used for all four scenarios ProtobufProduct product = ProtobufProduct.newBuilder() .setId(1001) .setName("Laptop") .setPrice(999.99) .build(); - // Create three different serializations of the same product + // Create four different serializations of the same product String standardProduct = serializeAndEncode(product); - String productWithSimpleIndex = serializeWithSimpleMessageIndex(product); - String productWithComplexIndex = serializeWithComplexMessageIndex(product); + String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex(product); + String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex(product); + String productWithGlueMagicByte = serializeWithGlueMagicByte(product); // Serialize and encode an integer key (same for all records) String encodedKey = serializeAndEncodeInteger(42); // Print the results - System.out.println("Base64 encoded Protobuf products with different message index scenarios:"); - System.out.println("\n1. Standard Protobuf (no message index):"); + System.out.println("Base64 encoded Protobuf products with different scenarios:"); + System.out.println("\n1. Plain Protobuf (no schema registry):"); System.out.println("value: \"" + standardProduct + "\""); - System.out.println("\n2. Simple Message Index (single 0):"); - System.out.println("value: \"" + productWithSimpleIndex + "\""); + System.out.println("\n2. Confluent with Simple Message Index (optimized single 0):"); + System.out.println("value: \"" + productWithConfluentSimpleIndex + "\""); - System.out.println("\n3. Complex Message Index (array [1,0]):"); - System.out.println("value: \"" + productWithComplexIndex + "\""); + System.out.println("\n3. Confluent with Complex Message Index (array [1,0]):"); + System.out.println("value: \"" + productWithConfluentComplexIndex + "\""); + + System.out.println("\n4. Glue with Magic Byte:"); + System.out.println("value: \"" + productWithGlueMagicByte + "\""); // Print the merged event structure System.out.println("\n" + "=".repeat(80)); - System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS"); + System.out.println("MERGED EVENT WITH ALL FOUR SCENARIOS"); System.out.println("=".repeat(80)); - printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex); + printSampleEvent(encodedKey, standardProduct, productWithConfluentSimpleIndex, productWithConfluentComplexIndex, + productWithGlueMagicByte); } private static String serializeAndEncode(ProtobufProduct product) { @@ -57,39 +64,59 @@ private static String serializeAndEncode(ProtobufProduct product) { } /** - * Serializes a protobuf product with a simple Confluent message index (single 0). + * Serializes a protobuf product with a simple Confluent message index (optimized single 0). * Format: [0][protobuf_data] * * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} */ - private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException { + private static String serializeWithConfluentSimpleMessageIndex(ProtobufProduct product) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); - // Write simple message index (single 0) - codedOutput.writeUInt32NoTag(0); + // Write optimized simple message index for Confluent (single 0 byte for [0]) + baos.write(0); // Write the protobuf data - product.writeTo(codedOutput); + baos.write(product.toByteArray()); - codedOutput.flush(); return Base64.getEncoder().encodeToString(baos.toByteArray()); } /** * Serializes a protobuf product with a complex Confluent message index (array [1,0]). - * Format: [2][1][0][protobuf_data] where 2 is the array length + * Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding * * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} */ - private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException { + private static String serializeWithConfluentComplexMessageIndex(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Write complex message index array [1,0] using ByteUtils + ByteBuffer buffer = ByteBuffer.allocate(1024); + ByteUtils.writeVarint(2, buffer); // Array length + ByteUtils.writeVarint(1, buffer); // First index value + ByteUtils.writeVarint(0, buffer); // Second index value + + buffer.flip(); + byte[] indexData = new byte[buffer.remaining()]; + buffer.get(indexData); + baos.write(indexData); + + // Write the protobuf data + baos.write(product.toByteArray()); + + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + /** + * Serializes a protobuf product with Glue magic byte. + * Format: [1][protobuf_data] where 1 is the magic byte + */ + private static String serializeWithGlueMagicByte(ProtobufProduct product) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); - // Write complex message index array [1,0] - codedOutput.writeUInt32NoTag(2); // Array length - codedOutput.writeUInt32NoTag(1); // First index value - codedOutput.writeUInt32NoTag(0); // Second index value + // Write Glue magic byte (single UInt32) + codedOutput.writeUInt32NoTag(1); // Write the protobuf data product.writeTo(codedOutput); @@ -103,8 +130,8 @@ private static String serializeAndEncodeInteger(Integer value) { return Base64.getEncoder().encodeToString(value.toString().getBytes()); } - private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct, - String complexIndexProduct) { + private static void printSampleEvent(String key, String standardProduct, String confluentSimpleProduct, + String confluentComplexProduct, String glueProduct) { System.out.println("{\n" + " \"eventSource\": \"aws:kafka\",\n" + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" @@ -134,12 +161,16 @@ private static void printSampleEvent(String key, String standardProduct, String " \"timestamp\": 1545084650988,\n" + " \"timestampType\": \"CREATE_TIME\",\n" + " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + simpleIndexProduct + "\",\n" + + " \"value\": \"" + confluentSimpleProduct + "\",\n" + " \"headers\": [\n" + " {\n" + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + " }\n" + - " ]\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"123\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + " },\n" + " {\n" + " \"topic\": \"mytopic\",\n" + @@ -148,12 +179,34 @@ private static void printSampleEvent(String key, String standardProduct, String " \"timestamp\": 1545084650989,\n" + " \"timestampType\": \"CREATE_TIME\",\n" + " \"key\": null,\n" + - " \"value\": \"" + complexIndexProduct + "\",\n" + + " \"value\": \"" + confluentComplexProduct + "\",\n" + " \"headers\": [\n" + " {\n" + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + " }\n" + - " ]\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"456\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"topic\": \"mytopic\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 18,\n" + + " \"timestamp\": 1545084650990,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": \"" + key + "\",\n" + + " \"value\": \"" + glueProduct + "\",\n" + + " \"headers\": [\n" + + " {\n" + + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + + " }\n" + + " ],\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + " }\n" + " ]\n" + " }\n" + diff --git a/examples/powertools-examples-parameters/sam-graalvm/pom.xml b/examples/powertools-examples-parameters/sam-graalvm/pom.xml index 320ed42cd..643b8f134 100644 --- a/examples/powertools-examples-parameters/sam-graalvm/pom.xml +++ b/examples/powertools-examples-parameters/sam-graalvm/pom.xml @@ -39,7 +39,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-parameters/sam/pom.xml b/examples/powertools-examples-parameters/sam/pom.xml index ea8029b8c..e8ce66e11 100644 --- a/examples/powertools-examples-parameters/sam/pom.xml +++ b/examples/powertools-examples-parameters/sam/pom.xml @@ -38,7 +38,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 org.aspectj diff --git a/examples/powertools-examples-serialization/pom.xml b/examples/powertools-examples-serialization/pom.xml index f9408012c..eedce8b87 100644 --- a/examples/powertools-examples-serialization/pom.xml +++ b/examples/powertools-examples-serialization/pom.xml @@ -31,7 +31,7 @@ com.amazonaws aws-lambda-java-events - 3.15.0 + 3.16.0 diff --git a/pom.xml b/pom.xml index 0542295bb..1033c3f6e 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 2.2.0 UTF-8 1.2.3 - 3.15.0 + 3.16.0 1.1.5 3.13.0 1.9.7 diff --git a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java index 8d0fc8f61..3de8320a5 100644 --- a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java +++ b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java @@ -41,6 +41,11 @@ abstract class AbstractKafkaDeserializer implements PowertoolsDeserializer { protected static final ObjectMapper objectMapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final Integer GLUE_SCHEMA_ID_LENGTH = 36; + + public enum SchemaRegistryType { + CONFLUENT, GLUE, NONE + } /** * Deserialize JSON from InputStream into ConsumerRecords @@ -170,8 +175,8 @@ private ConsumerRecord convertToConsumerRecord( Class keyType, Class valueType) { - K key = deserializeField(eventRecord.getKey(), keyType, "key"); - V value = deserializeField(eventRecord.getValue(), valueType, "value"); + K key = deserializeField(eventRecord.getKey(), keyType, "key", extractSchemaRegistryType(eventRecord)); + V value = deserializeField(eventRecord.getValue(), valueType, "value", extractSchemaRegistryType(eventRecord)); Headers headers = extractHeaders(eventRecord); return new ConsumerRecord<>( @@ -190,14 +195,15 @@ private ConsumerRecord convertToConsumerRecord( Optional.empty()); } - private T deserializeField(String encodedData, Class type, String fieldName) { + private T deserializeField(String encodedData, Class type, String fieldName, + SchemaRegistryType schemaRegistryType) { if (encodedData == null) { return null; } try { byte[] decodedBytes = Base64.getDecoder().decode(encodedData); - return deserialize(decodedBytes, type); + return deserialize(decodedBytes, type, schemaRegistryType); } catch (Exception e) { throw new RuntimeException("Failed to deserialize Kafka record " + fieldName + ".", e); } @@ -218,28 +224,60 @@ private Headers extractHeaders(KafkaEvent.KafkaEventRecord eventRecord) { return headers; } + private String extractKeySchemaId(KafkaEvent.KafkaEventRecord eventRecord) { + if (eventRecord.getKeySchemaMetadata() != null) { + return eventRecord.getKeySchemaMetadata().getSchemaId(); + } + return null; + } + + private String extractValueSchemaId(KafkaEvent.KafkaEventRecord eventRecord) { + if (eventRecord.getValueSchemaMetadata() != null) { + return eventRecord.getValueSchemaMetadata().getSchemaId(); + } + return null; + } + + private SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventRecord eventRecord) { + // This method is used for both key and value, so we try to extract the schema id from both fields + String schemaId = extractValueSchemaId(eventRecord); + if (schemaId == null) { + schemaId = extractKeySchemaId(eventRecord); + } + + if (schemaId == null) { + return SchemaRegistryType.NONE; + } + + return schemaId.length() == GLUE_SCHEMA_ID_LENGTH ? SchemaRegistryType.GLUE : SchemaRegistryType.CONFLUENT; + } + /** * Template method to be implemented by subclasses for specific deserialization logic - * for complex types (non-primitives). - * + * for complex types (non-primitives) and for specific Schema Registry type. + * * @param The type to deserialize to * @param data The byte array to deserialize coming from the base64 decoded Kafka field * @param type The class type to deserialize to + * @param schemaRegistryType Schema Registry type * @return The deserialized object * @throws IOException If deserialization fails */ - protected abstract T deserializeObject(byte[] data, Class type) throws IOException; + protected abstract T deserializeObject(byte[] data, Class type, SchemaRegistryType schemaRegistryType) + throws IOException; /** - * Main deserialize method that handles primitive types and delegates to subclasses for complex types. - * + * Main deserialize method that handles primitive types and delegates to subclasses for complex types and + * for specific Schema Registry type. + * * @param The type to deserialize to * @param data The byte array to deserialize * @param type The class type to deserialize to + * @param schemaRegistryType Schema Registry type * @return The deserialized object * @throws IOException If deserialization fails */ - private T deserialize(byte[] data, Class type) throws IOException { + private T deserialize(byte[] data, Class type, SchemaRegistryType schemaRegistryType) throws IOException { // First try to deserialize as a primitive type T result = deserializePrimitive(data, type); if (result != null) { @@ -247,7 +285,7 @@ private T deserialize(byte[] data, Class type) throws IOException { } // Delegate to subclass for complex type deserialization - return deserializeObject(data, type); + return deserializeObject(data, type, schemaRegistryType); } /** diff --git a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java index ddf09d4ff..70e4affac 100644 --- a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java +++ b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java @@ -26,7 +26,8 @@ public class KafkaAvroDeserializer extends AbstractKafkaDeserializer { @Override - protected T deserializeObject(byte[] data, Class type) throws IOException { + protected T deserializeObject(byte[] data, Class type, SchemaRegistryType schemaRegistryType) + throws IOException { // If no Avro generated class is passed we cannot deserialize using Avro if (SpecificRecordBase.class.isAssignableFrom(type)) { try { diff --git a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java index ed64f3786..733fe5d7d 100644 --- a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java +++ b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java @@ -21,7 +21,8 @@ public class KafkaJsonDeserializer extends AbstractKafkaDeserializer { @Override - protected T deserializeObject(byte[] data, Class type) throws IOException { + protected T deserializeObject(byte[] data, Class type, SchemaRegistryType schemaRegistryType) + throws IOException { String decodedStr = new String(data, StandardCharsets.UTF_8); return objectMapper.readValue(decodedStr, type); diff --git a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java index c15be552f..1a8085a61 100644 --- a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java +++ b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java @@ -13,7 +13,10 @@ package software.amazon.lambda.powertools.kafka.serializers; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.ByteUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +26,8 @@ /** * Deserializer for Kafka records using Protocol Buffers format. - * Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices. + * Supports both standard protobuf serialization and Confluent / Glue Schema Registry serialization using messages + * indices. * * For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped * by the Kafka ESM, leaving only the message index (if present) and protobuf data. @@ -33,19 +37,22 @@ public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class); + private static final String PROTOBUF_PARSER_METHOD = "parser"; @Override - @SuppressWarnings("unchecked") - protected T deserializeObject(byte[] data, Class type) throws IOException { - // If no Protobuf generated class is passed we cannot deserialize using Protobuf + protected T deserializeObject(byte[] data, Class type, SchemaRegistryType schemaRegistryType) + throws IOException { + // If no Protobuf generated class is passed, we cannot deserialize using Protobuf if (Message.class.isAssignableFrom(type)) { try { - // Get the parser from the generated Protobuf class - Parser parser = (Parser) type.getMethod("parser").invoke(null); - - // Try to deserialize the data, handling potential Confluent message indices - Message message = deserializeWithMessageIndexHandling(data, parser); - return type.cast(message); + switch (schemaRegistryType) { + case GLUE: + return glueDeserializer(data, type); + case CONFLUENT: + return confluentDeserializer(data, type); + default: + return defaultDeserializer(data, type); + } } catch (Exception e) { throw new IOException("Failed to deserialize Protobuf data.", e); } @@ -56,44 +63,48 @@ protected T deserializeObject(byte[] data, Class type) throws IOException } } - private Message deserializeWithMessageIndexHandling(byte[] data, Parser parser) throws IOException { + @SuppressWarnings("unchecked") + private T defaultDeserializer(byte[] data, Class type) throws IOException { try { - LOGGER.debug("Attempting to deserialize as standard protobuf data"); - return parser.parseFrom(data); + LOGGER.debug("Using default Protobuf deserializer"); + Parser parser = (Parser) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null); + Message message = parser.parseFrom(data); + return type.cast(message); } catch (Exception e) { - LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling"); - return deserializeWithMessageIndex(data, parser); + throw new IOException("Failed to deserialize Protobuf data.", e); } } - private Message deserializeWithMessageIndex(byte[] data, Parser parser) throws IOException { - CodedInputStream codedInputStream = CodedInputStream.newInstance(data); - - try { - // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format - // Read the first varint - this could be: - // 1. A single 0 (simple case - first message type) - // 2. The length of the message index array (complex case) - int firstValue = codedInputStream.readUInt32(); + @SuppressWarnings("unchecked") + private T confluentDeserializer(byte[] data, Class type) + throws IOException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + LOGGER.debug("Using Confluent Deserializer"); + Parser parser = (Parser) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null); + ByteBuffer buffer = ByteBuffer.wrap(data); + int size = ByteUtils.readVarint(buffer); - if (firstValue == 0) { - // Simple case: Single 0 byte means first message type - LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf"); - return parser.parseFrom(codedInputStream); - } else { - // Complex case: firstValue is the length of the message index array - LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values", - firstValue, firstValue); - for (int i = 0; i < firstValue; i++) { - codedInputStream.readUInt32(); // Skip each message index value - } - // Now the remaining data should be the actual protobuf message - LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf"); - return parser.parseFrom(codedInputStream); + // Only if the size is greater than zero, continue reading varInt. + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + if (size > 0) { + for (int i = 0; i < size; i++) { + ByteUtils.readVarint(buffer); } - - } catch (Exception e) { - throw new IOException("Failed to parse protobuf data with or without message index", e); } + Message message = parser.parseFrom(buffer); + return type.cast(message); + } + + @SuppressWarnings("unchecked") + private T glueDeserializer(byte[] data, Class type) + throws IOException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + LOGGER.debug("Using Glue Deserializer"); + CodedInputStream codedInputStream = CodedInputStream.newInstance(data); + Parser parser = (Parser) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null); + + // Seek one byte forward. Based on Glue Proto deserializer implementation + codedInputStream.readUInt32(); + + Message message = parser.parseFrom(codedInputStream); + return type.cast(message); } } diff --git a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java index 512058bca..b40a2a2b0 100644 --- a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java +++ b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java @@ -459,10 +459,117 @@ void shouldThrowExceptionWhenConvertingEmptyStringToChar(InputType inputType) { } } + @ParameterizedTest + @MethodSource("inputTypes") + void shouldHandleGlueSchemaMetadata(InputType inputType) throws IOException { + // Given + TestProductPojo product = new TestProductPojo(123, "Test Product", 99.99, null); + String productJson = objectMapper.writeValueAsString(product); + String base64Value = Base64.getEncoder().encodeToString(productJson.getBytes()); + + String kafkaJson = "{\n" + + " \"eventSource\": \"aws:kafka\",\n" + + " \"records\": {\n" + + " \"test-topic-1\": [\n" + + " {\n" + + " \"topic\": \"test-topic-1\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": null,\n" + + " \"value\": \"" + base64Value + "\",\n" + + " \"headers\": [],\n" + + " \"keySchemaMetadata\": {\n" + + " \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " },\n" + + " \"valueSchemaMetadata\": {\n" + + " \"schemaId\": \"87654321-4321-4321-4321-210987654321\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + Type type = TestUtils.createConsumerRecordsType(String.class, TestProductPojo.class); + + // When + ConsumerRecords records; + if (inputType == InputType.INPUT_STREAM) { + ByteArrayInputStream inputStream = new ByteArrayInputStream(kafkaJson.getBytes()); + records = deserializer.fromJson(inputStream, type); + } else { + records = deserializer.fromJson(kafkaJson, type); + } + + // Then + assertThat(records).isNotNull(); + TopicPartition tp = new TopicPartition("test-topic-1", 0); + List> topicRecords = records.records(tp); + assertThat(topicRecords).hasSize(1); + + ConsumerRecord consumerRecord = topicRecords.get(0); + assertThat(consumerRecord.value()).isNotNull(); + assertThat(consumerRecord.value().getId()).isEqualTo(123); + } + + @ParameterizedTest + @MethodSource("inputTypes") + void shouldHandleConfluentSchemaMetadata(InputType inputType) throws IOException { + // Given + TestProductPojo product = new TestProductPojo(456, "Confluent Product", 199.99, null); + String productJson = objectMapper.writeValueAsString(product); + String base64Value = Base64.getEncoder().encodeToString(productJson.getBytes()); + + String kafkaJson = "{\n" + + " \"eventSource\": \"aws:kafka\",\n" + + " \"records\": {\n" + + " \"test-topic-1\": [\n" + + " {\n" + + " \"topic\": \"test-topic-1\",\n" + + " \"partition\": 0,\n" + + " \"offset\": 15,\n" + + " \"timestamp\": 1545084650987,\n" + + " \"timestampType\": \"CREATE_TIME\",\n" + + " \"key\": null,\n" + + " \"value\": \"" + base64Value + "\",\n" + + " \"headers\": [],\n" + + " \"keySchemaMetadata\": {\n" + + " \"schemaId\": \"123\",\n" + + " \"dataFormat\": \"PROTOBUF\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + Type type = TestUtils.createConsumerRecordsType(String.class, TestProductPojo.class); + + // When + ConsumerRecords records; + if (inputType == InputType.INPUT_STREAM) { + ByteArrayInputStream inputStream = new ByteArrayInputStream(kafkaJson.getBytes()); + records = deserializer.fromJson(inputStream, type); + } else { + records = deserializer.fromJson(kafkaJson, type); + } + + // Then + assertThat(records).isNotNull(); + TopicPartition tp = new TopicPartition("test-topic-1", 0); + List> topicRecords = records.records(tp); + assertThat(topicRecords).hasSize(1); + + ConsumerRecord consumerRecord = topicRecords.get(0); + assertThat(consumerRecord.value()).isNotNull(); + assertThat(consumerRecord.value().getId()).isEqualTo(456); + } + // Test implementation of AbstractKafkaDeserializer - private static class TestDeserializer extends AbstractKafkaDeserializer { + private static final class TestDeserializer extends AbstractKafkaDeserializer { @Override - protected T deserializeObject(byte[] data, Class type) throws IOException { + protected T deserializeObject(byte[] data, Class type, SchemaRegistryType schemaRegistryType) + throws IOException { return objectMapper.readValue(data, type); } } diff --git a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java index a0b59b136..f501d7d98 100644 --- a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java +++ b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java @@ -38,9 +38,10 @@ void shouldThrowExceptionWhenTypeIsNotAvroSpecificRecord() { byte[] data = new byte[] { 1, 2, 3 }; // When/Then - assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class)) - .isInstanceOf(IOException.class) - .hasMessageContaining("Unsupported type for Avro deserialization"); + assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Unsupported type for Avro deserialization"); } @Test @@ -50,7 +51,8 @@ void shouldDeserializeValidAvroData() throws IOException { byte[] avroData = serializeAvro(product); // When - TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class); + TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE); // Then assertThat(result).isNotNull(); @@ -65,9 +67,10 @@ void shouldThrowExceptionWhenDeserializingInvalidAvroData() { byte[] invalidAvroData = new byte[] { 1, 2, 3, 4, 5 }; // When/Then - assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class)) - .isInstanceOf(IOException.class) - .hasMessageContaining("Failed to deserialize Avro data"); + assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to deserialize Avro data"); } } diff --git a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java index 0cfb2498b..c01e09d8d 100644 --- a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java +++ b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java @@ -42,8 +42,9 @@ void shouldThrowExceptionWhenTypeIsNotSupportedForJson() { byte[] data = new byte[] { 1, 2, 3 }; // When/Then - assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class)) - .isInstanceOf(JsonParseException.class); + assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE)) + .isInstanceOf(JsonParseException.class); } @Test @@ -53,7 +54,8 @@ void shouldDeserializeValidJsonData() throws IOException { byte[] jsonData = objectMapper.writeValueAsBytes(product); // When - TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class); + TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE); // Then assertThat(result).isNotNull(); diff --git a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java index 3315e1172..34a376947 100644 --- a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java +++ b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java @@ -17,7 +17,9 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.ByteUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -40,9 +42,10 @@ void shouldThrowExceptionWhenTypeIsNotProtobufMessage() { byte[] data = new byte[] { 1, 2, 3 }; // When/Then - assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class)) - .isInstanceOf(IOException.class) - .hasMessageContaining("Unsupported type for Protobuf deserialization"); + assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Unsupported type for Protobuf deserialization"); } @Test @@ -56,7 +59,8 @@ void shouldDeserializeValidProtobufData() throws IOException { byte[] protobufData = product.toByteArray(); // When - TestProduct result = deserializer.deserializeObject(protobufData, TestProduct.class); + TestProduct result = deserializer.deserializeObject(protobufData, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE); // Then assertThat(result).isNotNull(); @@ -71,13 +75,14 @@ void shouldThrowExceptionWhenDeserializingInvalidProtobufData() { byte[] invalidProtobufData = new byte[] { 1, 2, 3, 4, 5 }; // When/Then - assertThatThrownBy(() -> deserializer.deserializeObject(invalidProtobufData, TestProduct.class)) - .isInstanceOf(IOException.class) - .hasMessageContaining("Failed to deserialize Protobuf data"); + assertThatThrownBy(() -> deserializer.deserializeObject(invalidProtobufData, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.NONE)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to deserialize Protobuf data"); } @Test - void shouldDeserializeProtobufDataWithSimpleMessageIndex() throws IOException { + void shouldDeserializeProtobufDataWithSimpleMessageIndexGlue() throws IOException { // Given TestProduct product = TestProduct.newBuilder() .setId(456) @@ -86,10 +91,11 @@ void shouldDeserializeProtobufDataWithSimpleMessageIndex() throws IOException { .build(); // Create protobuf data with simple message index (single 0) - byte[] protobufDataWithSimpleIndex = createProtobufDataWithSimpleMessageIndex(product); + byte[] protobufDataWithSimpleIndex = createProtobufDataWithGlueMagicByte(product); // When - TestProduct result = deserializer.deserializeObject(protobufDataWithSimpleIndex, TestProduct.class); + TestProduct result = deserializer.deserializeObject(protobufDataWithSimpleIndex, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.GLUE); // Then assertThat(result).isNotNull(); @@ -107,11 +113,12 @@ void shouldDeserializeProtobufDataWithComplexMessageIndex() throws IOException { .setPrice(299.99) .build(); - // Create protobuf data with complex message index (array [1,0]) - byte[] protobufDataWithComplexIndex = createProtobufDataWithComplexMessageIndex(product); + // Create protobuf data with complex message index (array [2,2]) + byte[] protobufDataWithComplexIndex = createProtobufDataWithComplexMessageIndexConfluent(product); // When - TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class); + TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.CONFLUENT); // Then assertThat(result).isNotNull(); @@ -120,12 +127,35 @@ void shouldDeserializeProtobufDataWithComplexMessageIndex() throws IOException { assertThat(result.getPrice()).isEqualTo(299.99); } - private byte[] createProtobufDataWithSimpleMessageIndex(TestProduct product) throws IOException { + @Test + void shouldDeserializeProtobufDataWithSimpleMessageIndexConfluent() throws IOException { + // Given + TestProduct product = TestProduct.newBuilder() + .setId(789) + .setName("Complex Index Product") + .setPrice(299.99) + .build(); + + // Create protobuf data with simple message index for Confluent + byte[] protobufDataWithComplexIndex = createProtobufDataWithSimpleMessageIndexConfluent(product); + + // When + TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class, + AbstractKafkaDeserializer.SchemaRegistryType.CONFLUENT); + + // Then + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo(789); + assertThat(result.getName()).isEqualTo("Complex Index Product"); + assertThat(result.getPrice()).isEqualTo(299.99); + } + + private byte[] createProtobufDataWithGlueMagicByte(TestProduct product) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); - // Write simple message index (single 0) - codedOutput.writeUInt32NoTag(0); + // Write simple message index for Glue (single UInt32) + codedOutput.writeUInt32NoTag(1); // Write the protobuf data product.writeTo(codedOutput); @@ -134,19 +164,37 @@ private byte[] createProtobufDataWithSimpleMessageIndex(TestProduct product) thr return baos.toByteArray(); } - private byte[] createProtobufDataWithComplexMessageIndex(TestProduct product) throws IOException { + private byte[] createProtobufDataWithSimpleMessageIndexConfluent(TestProduct product) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); - // Write complex message index array [1,0] - codedOutput.writeUInt32NoTag(2); // Array length - codedOutput.writeUInt32NoTag(1); // First index value - codedOutput.writeUInt32NoTag(0); // Second index value + // Write optimized simple message index for Confluent (single 0 byte for [0]) + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + baos.write(0); // Write the protobuf data - product.writeTo(codedOutput); + baos.write(product.toByteArray()); + + return baos.toByteArray(); + } + + private byte[] createProtobufDataWithComplexMessageIndexConfluent(TestProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Write complex message index array [1,0] using ByteUtils + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + ByteBuffer buffer = ByteBuffer.allocate(1024); + ByteUtils.writeVarint(2, buffer); // Array length + ByteUtils.writeVarint(1, buffer); // First index value + ByteUtils.writeVarint(0, buffer); // Second index value + + buffer.flip(); + byte[] indexData = new byte[buffer.remaining()]; + buffer.get(indexData); + baos.write(indexData); + + // Write the protobuf data + baos.write(product.toByteArray()); - codedOutput.flush(); return baos.toByteArray(); } }