Skip to content

fix(kafka): Handle message indices in proto data also for Glue Schema Registry #1907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {
implementation 'com.amazonaws:aws-lambda-java-core:1.2.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.13.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.2'
implementation 'com.amazonaws:aws-lambda-java-events:3.11.0'
implementation 'com.amazonaws:aws-lambda-java-events:3.16.0'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2'
implementation 'org.aspectj:aspectjrt:1.9.20.1'
aspect 'software.amazon.lambda:powertools-tracing:2.1.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
implementation("com.amazonaws:aws-lambda-java-core:1.2.3")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.15.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.3")
implementation("com.amazonaws:aws-lambda-java-events:3.11.3")
implementation("com.amazonaws:aws-lambda-java-events:3.16.0")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2")
implementation("org.aspectj:aspectjrt:1.9.20.1")
aspect("software.amazon.lambda:powertools-tracing:2.1.0")
Expand All @@ -23,4 +23,4 @@ dependencies {

kotlin {
jvmToolchain(11)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.11.3</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
2 changes: 1 addition & 1 deletion examples/powertools-examples-core-utilities/sam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
2 changes: 1 addition & 1 deletion examples/powertools-examples-idempotency/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
],
"valueSchemaMetadata": {
"schemaId": "123",
"dataFormat": "PROTOBUF"
}
},
{
"topic": "mytopic",
Expand All @@ -39,12 +43,34 @@
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
"value": "BAIACOkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
],
"valueSchemaMetadata": {
"schemaId": "456",
"dataFormat": "PROTOBUF"
}
},
{
"topic": "mytopic",
"partition": 0,
"offset": 18,
"timestamp": 1545084650990,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "AQjpBxIGTGFwdG9wGVK4HoXrP49A",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
],
"valueSchemaMetadata": {
"schemaId": "12345678-1234-1234-1234-123456789012",
"dataFormat": "PROTOBUF"
}
}
]
}
Expand Down
6 changes: 6 additions & 0 deletions examples/powertools-examples-kafka/tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<maven.compiler.target>11</maven.compiler.target>
<avro.version>1.12.0</avro.version>
<protobuf.version>4.31.0</protobuf.version>
<kafka-clients.version>4.0.0</kafka-clients.version>
</properties>

<dependencies>
Expand All @@ -26,6 +27,11 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;

import org.apache.kafka.common.utils.ByteUtils;
import org.demo.kafka.protobuf.ProtobufProduct;

import com.google.protobuf.CodedOutputStream;
Expand All @@ -19,77 +21,102 @@ private GenerateProtobufSamples() {
}

public static void main(String[] args) throws IOException {
// Create a single product that will be used for all three scenarios
// Create a single product that will be used for all four scenarios
ProtobufProduct product = ProtobufProduct.newBuilder()
.setId(1001)
.setName("Laptop")
.setPrice(999.99)
.build();

// Create three different serializations of the same product
// Create four different serializations of the same product
String standardProduct = serializeAndEncode(product);
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
String productWithComplexIndex = serializeWithComplexMessageIndex(product);
String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex(product);
String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex(product);
String productWithGlueMagicByte = serializeWithGlueMagicByte(product);

// Serialize and encode an integer key (same for all records)
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
System.out.println("\n1. Standard Protobuf (no message index):");
System.out.println("Base64 encoded Protobuf products with different scenarios:");
System.out.println("\n1. Plain Protobuf (no schema registry):");
System.out.println("value: \"" + standardProduct + "\"");

System.out.println("\n2. Simple Message Index (single 0):");
System.out.println("value: \"" + productWithSimpleIndex + "\"");
System.out.println("\n2. Confluent with Simple Message Index (optimized single 0):");
System.out.println("value: \"" + productWithConfluentSimpleIndex + "\"");

System.out.println("\n3. Complex Message Index (array [1,0]):");
System.out.println("value: \"" + productWithComplexIndex + "\"");
System.out.println("\n3. Confluent with Complex Message Index (array [1,0]):");
System.out.println("value: \"" + productWithConfluentComplexIndex + "\"");

System.out.println("\n4. Glue with Magic Byte:");
System.out.println("value: \"" + productWithGlueMagicByte + "\"");

// Print the merged event structure
System.out.println("\n" + "=".repeat(80));
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
System.out.println("MERGED EVENT WITH ALL FOUR SCENARIOS");
System.out.println("=".repeat(80));
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
printSampleEvent(encodedKey, standardProduct, productWithConfluentSimpleIndex, productWithConfluentComplexIndex,
productWithGlueMagicByte);
}

private static String serializeAndEncode(ProtobufProduct product) {
return Base64.getEncoder().encodeToString(product.toByteArray());
}

/**
* Serializes a protobuf product with a simple Confluent message index (single 0).
* Serializes a protobuf product with a simple Confluent message index (optimized single 0).
* Format: [0][protobuf_data]
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
private static String serializeWithConfluentSimpleMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write simple message index (single 0)
codedOutput.writeUInt32NoTag(0);
// Write optimized simple message index for Confluent (single 0 byte for [0])
baos.write(0);

// Write the protobuf data
product.writeTo(codedOutput);
baos.write(product.toByteArray());

codedOutput.flush();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

/**
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
* Format: [2][1][0][protobuf_data] where 2 is the array length
* Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
private static String serializeWithConfluentComplexMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

// Write complex message index array [1,0] using ByteUtils
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteUtils.writeVarint(2, buffer); // Array length
ByteUtils.writeVarint(1, buffer); // First index value
ByteUtils.writeVarint(0, buffer); // Second index value

buffer.flip();
byte[] indexData = new byte[buffer.remaining()];
buffer.get(indexData);
baos.write(indexData);

// Write the protobuf data
baos.write(product.toByteArray());

return Base64.getEncoder().encodeToString(baos.toByteArray());
}

/**
* Serializes a protobuf product with Glue magic byte.
* Format: [1][protobuf_data] where 1 is the magic byte
*/
private static String serializeWithGlueMagicByte(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write complex message index array [1,0]
codedOutput.writeUInt32NoTag(2); // Array length
codedOutput.writeUInt32NoTag(1); // First index value
codedOutput.writeUInt32NoTag(0); // Second index value
// Write Glue magic byte (single UInt32)
codedOutput.writeUInt32NoTag(1);

// Write the protobuf data
product.writeTo(codedOutput);
Expand All @@ -103,8 +130,8 @@ private static String serializeAndEncodeInteger(Integer value) {
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
String complexIndexProduct) {
private static void printSampleEvent(String key, String standardProduct, String confluentSimpleProduct,
String confluentComplexProduct, String glueProduct) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
Expand Down Expand Up @@ -134,12 +161,16 @@ private static void printSampleEvent(String key, String standardProduct, String
" \"timestamp\": 1545084650988,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + simpleIndexProduct + "\",\n" +
" \"value\": \"" + confluentSimpleProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" ],\n" +
" \"valueSchemaMetadata\": {\n" +
" \"schemaId\": \"123\",\n" +
" \"dataFormat\": \"PROTOBUF\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
Expand All @@ -148,12 +179,34 @@ private static void printSampleEvent(String key, String standardProduct, String
" \"timestamp\": 1545084650989,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + complexIndexProduct + "\",\n" +
" \"value\": \"" + confluentComplexProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ]\n" +
" ],\n" +
" \"valueSchemaMetadata\": {\n" +
" \"schemaId\": \"456\",\n" +
" \"dataFormat\": \"PROTOBUF\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"topic\": \"mytopic\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 18,\n" +
" \"timestamp\": 1545084650990,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + glueProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
" }\n" +
" ],\n" +
" \"valueSchemaMetadata\": {\n" +
" \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" +
" \"dataFormat\": \"PROTOBUF\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
2 changes: 1 addition & 1 deletion examples/powertools-examples-parameters/sam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
2 changes: 1 addition & 1 deletion examples/powertools-examples-serialization/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<payloadoffloading-common.version>2.2.0</payloadoffloading-common.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<lambda.core.version>1.2.3</lambda.core.version>
<lambda.events.version>3.15.0</lambda.events.version>
<lambda.events.version>3.16.0</lambda.events.version>
<lambda.serial.version>1.1.5</lambda.serial.version>
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<aspectj.version>1.9.7</aspectj.version>
Expand Down
Loading
Loading