Skip to content

Commit 9c0bd41

Browse files
kbatuigasJakeSCahill
authored andcommitted
[25.2] Iceberg - JSON Schema support (#1207)
Co-authored-by: Jake Cahill <[email protected]>
1 parent 6fb17a5 commit 9c0bd41

File tree

7 files changed

+136
-29
lines changed

7 files changed

+136
-29
lines changed

modules/ROOT/nav.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@
170170
*** xref:manage:whole-cluster-restore.adoc[Whole Cluster Restore]
171171
** xref:manage:iceberg/index.adoc[Iceberg]
172172
*** xref:manage:iceberg/about-iceberg-topics.adoc[About Iceberg Topics]
173-
*** xref:manage:iceberg/choose-iceberg-mode.adoc[Choose Iceberg Mode]
173+
*** xref:manage:iceberg/specify-iceberg-schema.adoc[Specify Iceberg Schema]
174174
*** xref:manage:iceberg/use-iceberg-catalogs.adoc[Use Iceberg Catalogs]
175175
*** xref:manage:iceberg/query-iceberg-topics.adoc[Query Iceberg Topics]
176176
*** xref:manage:iceberg/iceberg-topics-databricks-unity.adoc[Query Iceberg Topics with Databricks Unity Catalog]

modules/get-started/pages/release-notes/redpanda.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ This topic includes new content added in version {page-component-version}. For a
77
* xref:redpanda-cloud:get-started:whats-new-cloud.adoc[]
88
* xref:redpanda-cloud:get-started:cloud-overview.adoc#redpanda-cloud-vs-self-managed-feature-compatibility[Redpanda Cloud vs Self-Managed feature compatibility]
99
10+
== JSON Schema support for Iceberg topics
11+
12+
Redpanda now supports JSON Schema for Iceberg topics. This allows you to use all supported schema types (Protobuf, Avro, and JSON Schema) for Iceberg topics. For more information, see xref:manage:iceberg/specify-iceberg-schema.adoc[].
13+
1014
== Manage SASL users with Kafka APIs
1115

1216
Redpanda now supports the following Kafka APIs for managing SASL user credentials as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API[KIP-554^]:

modules/manage/pages/iceberg/choose-iceberg-mode.adoc renamed to modules/manage/pages/iceberg/specify-iceberg-schema.adoc

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
= Choose an Iceberg Mode
1+
= Specify Iceberg Schema
22
:description: Learn about supported Iceberg modes and how you can integrate schemas with Iceberg topics.
33
:page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration
4+
:page-aliases: manage:iceberg/choose-iceberg-mode.adoc
45
:schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc
6+
57
// tag::single-source[]
68

79
ifndef::env-cloud[]
@@ -11,9 +13,7 @@ include::shared:partial$enterprise-license.adoc[]
1113
====
1214
endif::[]
1315

14-
In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of an Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.
15-
16-
NOTE: The JSON Schema format is not supported for Iceberg topics. If your topic data is in JSON, use the `key_value` mode.
16+
In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of a schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.
1717

1818
== Supported Iceberg modes
1919

@@ -37,7 +37,11 @@ In the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Regist
3737

3838
=== value_schema_latest
3939

40-
Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry. Unlike the `value_schema_id_prefix` mode, `value_schema_latest` does not require that producers use the wire format.
40+
Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry.
41+
42+
Producers cannot use the wire format in `value_schema_latest` mode. Redpanda expects the serialized message as-is without the magic byte or schema ID prefix in the record value.
43+
44+
NOTE: The `value_schema_latest` mode is not compatible with the xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`] command which embeds the wire format header. You must use your own producer code to produce to topics in `value_schema_latest` mode.
4145

4246
The latest schema is cached periodically. The cache period is defined by the cluster property `iceberg_latest_schema_cache_ttl_ms` (default: 5 minutes).
4347

@@ -68,7 +72,7 @@ rpk topic create <topic-name> --topic-config=redpanda.iceberg.mode=<iceberg-mode
6872
.Option 2. Set `redpanda.iceberg.mode` for an existing topic:
6973
[,bash]
7074
----
71-
rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
75+
rpk topic alter-config <topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
7276
----
7377

7478
[[override-value-schema-latest-default]]
@@ -93,8 +97,14 @@ value_schema_latest:subject=<subject-name>,protobuf_name=<protobuf-message-full-
9397

9498
* For both Avro and Protobuf, specify a different subject name by using the key-value pair `subject=<subject-name>`, for example `value_schema_latest:subject=sensor-data`.
9599
* For Protobuf only:
96-
** Specify a different message definition by using a key-value pair `protobuf_name=<message-name>`, for example: `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
100+
+
101+
--
102+
** Specify a different message definition by using a key-value pair `protobuf_name=<message-full-name>`. You must use the fully qualified name, which includes the package name, for example, `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
97103
** To specify both a different subject and message definition, separate the key-value pairs with a comma, for example: `value_schema_latest:subject=my_protobuf_schema,protobuf_name=com.example.manufacturing.SensorData`.
104+
--
105+
+
106+
NOTE: If you don't specify the fully qualified Protobuf message name, Redpanda pauses the data translation to the Iceberg table until you fix the topic misconfiguration.
107+
98108

99109
== How Iceberg modes translate to table format
100110

@@ -140,7 +150,7 @@ CREATE TABLE ClickEvent (
140150
)
141151
----
142152

143-
Use `key_value` mode if the topic data is in JSON or if you are able to use the Iceberg data in its semi-structured format.
153+
Use `key_value` mode if you want to use the Iceberg data in its semi-structured format.
144154

145155
The `value_schema_id_prefix` and `value_schema_latest` modes can use the schema to translate to the following table format:
146156

@@ -162,7 +172,7 @@ CREATE TABLE ClickEvent (
162172

163173
As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database.
164174

165-
If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue] for more information.
175+
If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors] for more information.
166176

167177
=== Schema types translation
168178

@@ -185,7 +195,7 @@ Avro::
185195
| string | string
186196
| record | struct
187197
| array | list
188-
| maps | map
198+
| map | map
189199
| fixed | fixed*
190200
| decimal | decimal
191201
| uuid | uuid*
@@ -234,14 +244,97 @@ Protobuf::
234244
235245
There are some cases where the Protobuf type does not map directly to an Iceberg type and Redpanda applies the following transformations:
236246
237-
* Repeated values are translated into Iceberg `array` types.
247+
* Repeated values are translated into Iceberg `list` types.
238248
* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type.
239249
* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg.
240250
* `uint64` and `fixed64` values are translated into their Base-10 string representation.
241251
* `google.protobuf.Timestamp` is translated into `timestamp` in Iceberg.
242252
243253
Recursive types are not supported.
244254
--
255+
256+
JSON Schema::
257+
+
258+
--
259+
Requirements:
260+
261+
- Only JSON Schema Draft-07 is currently supported.
262+
- You must declare the JSON Schema dialect using the `$schema` keyword, for example `"$schema": "http://json-schema.org/draft-07/schema#"`.
263+
- You must use a JSON Schema that constrains JSON documents to a strict type in order for Redpanda to translate to Iceberg; that is, each subschema must use the `type` keyword.
264+
265+
.Valid JSON Schema example
266+
[,json]
267+
----
268+
{
269+
"$schema": "http://json-schema.org/draft-07/schema#",
270+
"type": "object",
271+
"properties": {
272+
"productId": {
273+
"type": "integer"
274+
},
275+
"tags": {
276+
"type": "array",
277+
"items": {
278+
"type": "string"
279+
}
280+
}
281+
}
282+
}
283+
----
284+
285+
|===
286+
| JSON type | Iceberg type | Notes
287+
288+
| array
289+
| list
290+
| The keywords `items` and `additionalItems` must be used to constrain element types.
291+
292+
| boolean
293+
| boolean
294+
|
295+
296+
| null
297+
|
298+
| The `null` type is not supported except when it is paired with another type to indicate nullability.
299+
300+
| number
301+
| double
302+
|
303+
304+
| integer
305+
| long
306+
|
307+
308+
| string
309+
| string
310+
| The `format` keyword can be used for custom Iceberg types. See <<format-translation,`format` annotation translation>> for details.
311+
312+
| object
313+
| struct
314+
| The `properties` keyword must be used to define `struct` fields and constrain their types. The `additionalProperties` keyword is accepted only when it is set to `false`.
315+
316+
|===
317+
318+
[[format-translation]]
319+
.`format` annotation translation
320+
|===
321+
| `format` value | Iceberg type
322+
323+
| date-time | timestamptz
324+
| date | date
325+
| time | time
326+
327+
|===
328+
329+
The following are not supported for JSON Schema:
330+
331+
* Relative and absolute (including external) references using `$ref` and `$dynamicRef` keywords
332+
* The `default` keyword
333+
* Conditional typing (`if`, `then`, `else`, `dependent` keywords)
334+
* Boolean JSON Schema combinations (`allOf`, `anyOf`, `oneOf` keywords)
335+
* Dynamic object members (`patternProperties` and `additionalProperties` (except when it is set to `false`) keywords)
336+
--
337+
245338
======
246339

247340
// end::single-source[]

modules/manage/partials/iceberg/about-iceberg-topics.adoc

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ endif::[]
5252

5353
* It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda.
5454
* If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data.
55-
* JSON schemas are not currently supported. If the topic data is in JSON, use the `key_value` mode to store the JSON in Iceberg, which then can be parsed by most query engines.
55+
* JSON schemas are supported starting with Redpanda version 25.2.
5656

5757
== Enable Iceberg integration
5858

@@ -231,7 +231,7 @@ Data in an Iceberg-enabled topic is consumable from Kafka based on the configure
231231

232232
== Schema evolution
233233

234-
Redpanda supports schema evolution for Avro and Protobuf schemas in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema.
234+
Redpanda supports schema evolution in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema.
235235

236236
For example, if you produce records to a topic `demo-topic` with the following Avro schema:
237237

@@ -310,22 +310,21 @@ Querying the Iceberg table for `demo-topic` includes the new column `ts`:
310310
+---------+--------------+--------------------------+
311311
----
312312

313-
== Manage dead-letter queue
313+
== Troubleshoot errors
314314

315-
Errors may occur when translating records in the `value_schema_id_prefix` mode to the Iceberg table format; for example, if you do not use the Schema Registry wire format with the magic byte, if the schema ID in the record is not found in the Schema Registry, or if an Avro or Protobuf data type cannot be translated to an Iceberg type.
315+
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda by default writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. The following can cause errors to occur when translating records in the `value_schema_id_prefix` and `value_schema_latest` modes to the Iceberg table format:
316316

317-
ifndef::env-cloud[]
318-
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. To disable the default behavior for a topic and drop the record, set the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-invalid-record-action[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
319-
endif::[]
320-
ifdef::env-cloud[]
321-
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. To disable the default behavior for a topic and drop the record, set the `redpanda.iceberg.invalid.record.action` topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
322-
endif::[]
317+
- Redpanda cannot find the embedded schema ID in the Schema Registry.
318+
- Redpanda fails to translate one or more schema data types to an Iceberg type.
319+
- In `value_schema_id_prefix` mode, you do not use the Schema Registry wire format with the magic byte.
323320

324321
The DLQ table itself uses the `key_value` schema, consisting of two columns: the record metadata including the key, and a binary column for the record's value.
325322

326-
You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream.
323+
NOTE: Topic property misconfiguration, such as xref:manage:iceberg/specify-iceberg-schema.adoc#override-value-schema-latest-default[overriding the default behavior of `value_schema_latest` mode] but not specifying the fully qualified Protobuf message name, does not cause records to be written to the DLQ table. Instead, Redpanda pauses the topic data translation to the Iceberg table until you fix the misconfiguration.
327324

328-
=== Reprocess DLQ records
325+
=== Inspect DLQ table
326+
327+
You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream.
329328

330329
The following example produces a record to a topic named `ClickEvent` and does not use the Schema Registry wire format that includes the magic byte and schema ID:
331330

@@ -356,7 +355,9 @@ FROM <catalog-name>."ClickEvent~dlq"; -- Fully qualified table name
356355
+-------------------------------------------------+
357356
----
358357

359-
The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema.
358+
The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema.
359+
360+
=== Reprocess DLQ records
360361

361362
You can apply a transformation and reprocess the record in your data lakehouse to the original Iceberg table. In this case, you have a JSON value represented as a UTF-8 binary. Depending on your query engine, you might need to decode the binary value first before extracting the JSON fields. Some engines may automatically decode the binary value for you:
362363

@@ -385,6 +386,15 @@ FROM (
385386

386387
You can now insert the transformed record back into the main Iceberg table. Redpanda recommends employing a strategy for exactly-once processing to avoid duplicates when reprocessing records.
387388

389+
=== Drop invalid records
390+
391+
ifndef::env-cloud[]
392+
To disable the default behavior and drop an invalid record, set the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-invalid-record-action[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
393+
endif::[]
394+
ifdef::env-cloud[]
395+
To disable the default behavior and drop an invalid record, set the `redpanda.iceberg.invalid.record.action` topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
396+
endif::[]
397+
388398
== Performance considerations
389399

390400
When you enable Iceberg for any substantial workload and start translating topic data to the Iceberg format, you may see most of your cluster's CPU utilization increase. If this additional workload overwhelms the brokers and causes the Iceberg table lag to exceed the configured target lag, Redpanda automatically applies backpressure to producers to prevent Iceberg tables from lagging further. This ensures that Iceberg tables keep up with the volume of incoming data, but sacrifices ingress throughput of the cluster.

modules/manage/partials/iceberg/query-iceberg-topics.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ endif::[]
7777

7878
=== Topic with schema (`value_schema_id_prefix` mode)
7979

80-
NOTE: The steps in this section also apply to the `value_schema_latest` mode, except for step 2. The `value_schema_latest` mode doesn't require the Schema Registry wire format, so you'll use your own producer code instead of xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`].
80+
NOTE: The steps in this section also apply to the `value_schema_latest` mode, except the produce step. The `value_schema_latest` mode is not compatible with the Schema Registry wire format. The xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`] command embeds the wire format header, so you must use your own producer code with `value_schema_latest`.
8181

8282
Assume that you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`:
8383

@@ -139,7 +139,7 @@ In this example, assume that you have created the `ClickEvent_key_value` topic,
139139
+
140140
[,bash]
141141
----
142-
echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'
142+
echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'
143143
----
144144

145145
. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value:

modules/reference/pages/properties/cluster-properties.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2179,7 +2179,7 @@ endif::[]
21792179

21802180
**Related topics**:
21812181

2182-
- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue]
2182+
- xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors]
21832183

21842184
---
21852185

modules/reference/pages/properties/topic-properties.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ Whether to write invalid records to a dead-letter queue (DLQ).
611611

612612
**Related topics**:
613613

614-
- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue]
614+
- xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors]
615615

616616
---
617617

0 commit comments

Comments
 (0)