Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
*** xref:manage:whole-cluster-restore.adoc[Whole Cluster Restore]
** xref:manage:iceberg/index.adoc[Iceberg]
*** xref:manage:iceberg/about-iceberg-topics.adoc[About Iceberg Topics]
*** xref:manage:iceberg/choose-iceberg-mode.adoc[Choose Iceberg Mode]
*** xref:manage:iceberg/specify-iceberg-schema.adoc[Specify Iceberg Schema]
*** xref:manage:iceberg/use-iceberg-catalogs.adoc[Use Iceberg Catalogs]
*** xref:manage:iceberg/query-iceberg-topics.adoc[Query Iceberg Topics]
*** xref:manage:iceberg/iceberg-topics-databricks-unity.adoc[Query Iceberg Topics with Databricks Unity Catalog]
Expand Down
4 changes: 4 additions & 0 deletions modules/get-started/pages/release-notes/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ This topic includes new content added in version {page-component-version}. For a
* xref:redpanda-cloud:get-started:whats-new-cloud.adoc[]
* xref:redpanda-cloud:get-started:cloud-overview.adoc#redpanda-cloud-vs-self-managed-feature-compatibility[Redpanda Cloud vs Self-Managed feature compatibility]

== JSON Schema support for Iceberg topics

Redpanda now supports JSON Schema for Iceberg topics. This allows you to use all supported schema types (Protobuf, Avro, and JSON Schema) for Iceberg topics. For more information, see xref:manage:iceberg/specify-iceberg-schema.adoc[].

== Manage SASL users with Kafka APIs

Redpanda now supports the following Kafka APIs for managing SASL user credentials as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API[KIP-554^]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
= Choose an Iceberg Mode
= Specify Iceberg Schema
:description: Learn about supported Iceberg modes and how you can integrate schemas with Iceberg topics.
:page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration
:page-aliases: manage:iceberg/choose-iceberg-mode.adoc
:schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc

// tag::single-source[]

ifndef::env-cloud[]
Expand All @@ -11,9 +13,7 @@ include::shared:partial$enterprise-license.adoc[]
====
endif::[]

In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of an Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.

NOTE: The JSON Schema format is not supported for Iceberg topics. If your topic data is in JSON, use the `key_value` mode.
In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of a schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.

== Supported Iceberg modes

Expand All @@ -37,7 +37,11 @@ In the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Regist

=== value_schema_latest

Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry. Unlike the `value_schema_id_prefix` mode, `value_schema_latest` does not require that producers use the wire format.
Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry.

Producers cannot use the wire format in `value_schema_latest` mode. Redpanda expects the serialized message as-is without the magic byte or schema ID prefix in the record value.

NOTE: The `value_schema_latest` mode is not compatible with the xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`] command which embeds the wire format header. You must use your own producer code to produce to topics in `value_schema_latest` mode.

The latest schema is cached periodically. The cache period is defined by the cluster property `iceberg_latest_schema_cache_ttl_ms` (default: 5 minutes).

Expand All @@ -58,7 +62,7 @@ rpk topic create <topic-name> --topic-config=redpanda.iceberg.mode=<iceberg-mode
.Option 2. Set `redpanda.iceberg.mode` for an existing topic:
[,bash]
----
rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
rpk topic alter-config <topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
----

[[override-value-schema-latest-default]]
Expand All @@ -83,8 +87,14 @@ value_schema_latest:subject=<subject-name>,protobuf_name=<protobuf-message-full-

* For both Avro and Protobuf, specify a different subject name by using the key-value pair `subject=<subject-name>`, for example `value_schema_latest:subject=sensor-data`.
* For Protobuf only:
** Specify a different message definition by using a key-value pair `protobuf_name=<message-name>`, for example: `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
+
--
** Specify a different message definition by using a key-value pair `protobuf_name=<message-full-name>`. You must use the fully qualified name, which includes the package name, for example, `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
** To specify both a different subject and message definition, separate the key-value pairs with a comma, for example: `value_schema_latest:subject=my_protobuf_schema,protobuf_name=com.example.manufacturing.SensorData`.
--
+
NOTE: If you don't specify the fully qualified Protobuf message name, Redpanda pauses the data translation to the Iceberg table until you fix the topic misconfiguration.


== How Iceberg modes translate to table format

Expand Down Expand Up @@ -130,7 +140,7 @@ CREATE TABLE ClickEvent (
)
----

Use `key_value` mode if the topic data is in JSON or if you are able to use the Iceberg data in its semi-structured format.
Use `key_value` mode if you want to use the Iceberg data in its semi-structured format.

The `value_schema_id_prefix` and `value_schema_latest` modes can use the schema to translate to the following table format:

Expand All @@ -152,7 +162,7 @@ CREATE TABLE ClickEvent (

As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database.

If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue] for more information.
If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors] for more information.

=== Schema types translation

Expand All @@ -175,7 +185,7 @@ Avro::
| string | string
| record | struct
| array | list
| maps | map
| map | map
| fixed | fixed*
| decimal | decimal
| uuid | uuid*
Expand Down Expand Up @@ -224,14 +234,97 @@ Protobuf::

There are some cases where the Protobuf type does not map directly to an Iceberg type and Redpanda applies the following transformations:

* Repeated values are translated into Iceberg `array` types.
* Repeated values are translated into Iceberg `list` types.
* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type.
* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg.
* `uint64` and `fixed64` values are translated into their Base-10 string representation.
* `google.protobuf.Timestamp` is translated into `timestamp` in Iceberg.

Recursive types are not supported.
--

JSON Schema::
+
--
Requirements:

- Only JSON Schema Draft-07 is currently supported.
- You must declare the JSON Schema dialect using the `$schema` keyword, for example `"$schema": "http://json-schema.org/draft-07/schema#"`.
- You must use a JSON Schema that constrains JSON documents to a strict type in order for Redpanda to translate to Iceberg; that is, each subschema must use the `type` keyword.

.Valid JSON Schema example
[,json]
----
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"productId": {
"type": "integer"
},
"tags": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
----

|===
| JSON type | Iceberg type | Notes

| array
| list
| The keywords `items` and `additionalItems` must be used to constrain element types.

| boolean
| boolean
|

| null
|
| The `null` type is not supported except when it is paired with another type to indicate nullability.

| number
| double
|

| integer
| long
|

| string
| string
| The `format` keyword can be used for custom Iceberg types. See <<format-translation,`format` annotation translation>> for details.

| object
| struct
| The `properties` keyword must be used to define `struct` fields and constrain their types. The `additionalProperties` keyword is accepted only when it is set to `false`.

|===

[[format-translation]]
.`format` annotation translation
|===
| `format` value | Iceberg type

| date-time | timestamptz
| date | date
| time | time

|===

The following are not supported for JSON Schema:

* Relative and absolute (including external) references using `$ref` and `$dynamicRef` keywords
* The `default` keyword
* Conditional typing (`if`, `then`, `else`, `dependent` keywords)
* Boolean JSON Schema combinations (`allOf`, `anyOf`, `oneOf` keywords)
* Dynamic object members (`patternProperties` and `additionalProperties` (except when it is set to `false`) keywords)
--

======

// end::single-source[]
Expand Down
36 changes: 23 additions & 13 deletions modules/manage/partials/iceberg/about-iceberg-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ endif::[]

* It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda.
* If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data.
* JSON schemas are not currently supported. If the topic data is in JSON, use the `key_value` mode to store the JSON in Iceberg, which then can be parsed by most query engines.
* JSON schemas are supported starting with Redpanda version 25.2.

== Enable Iceberg integration

Expand Down Expand Up @@ -233,7 +233,7 @@ Data in an Iceberg-enabled topic is consumable from Kafka based on the configure

== Schema evolution

Redpanda supports schema evolution for Avro and Protobuf schemas in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema.
Redpanda supports schema evolution in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema.

For example, if you produce records to a topic `demo-topic` with the following Avro schema:

Expand Down Expand Up @@ -312,22 +312,21 @@ Querying the Iceberg table for `demo-topic` includes the new column `ts`:
+---------+--------------+--------------------------+
----

== Manage dead-letter queue
== Troubleshoot errors

Errors may occur when translating records in the `value_schema_id_prefix` mode to the Iceberg table format; for example, if you do not use the Schema Registry wire format with the magic byte, if the schema ID in the record is not found in the Schema Registry, or if an Avro or Protobuf data type cannot be translated to an Iceberg type.
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda by default writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. The following can cause errors to occur when translating records in the `value_schema_id_prefix` and `value_schema_latest` modes to the Iceberg table format:

ifndef::env-cloud[]
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. To disable the default behavior for a topic and drop the record, set the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-invalid-record-action[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
endif::[]
ifdef::env-cloud[]
If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `<topic-name>~dlq`. To disable the default behavior for a topic and drop the record, set the `redpanda.iceberg.invalid.record.action` topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
endif::[]
- Redpanda cannot find the embedded schema ID in the Schema Registry.
- Redpanda fails to translate one or more schema data types to an Iceberg type.
- In `value_schema_id_prefix` mode, you do not use the Schema Registry wire format with the magic byte.

The DLQ table itself uses the `key_value` schema, consisting of two columns: the record metadata including the key, and a binary column for the record's value.

You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream.
NOTE: Topic property misconfiguration, such as xref:manage:iceberg/specify-iceberg-schema.adoc#override-value-schema-latest-default[overriding the default behavior of `value_schema_latest` mode] but not specifying the fully qualified Protobuf message name, does not cause records to be written to the DLQ table. Instead, Redpanda pauses the topic data translation to the Iceberg table until you fix the misconfiguration.

=== Reprocess DLQ records
=== Inspect DLQ table

You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream.

The following example produces a record to a topic named `ClickEvent` and does not use the Schema Registry wire format that includes the magic byte and schema ID:

Expand Down Expand Up @@ -358,7 +357,9 @@ FROM <catalog-name>."ClickEvent~dlq"; -- Fully qualified table name
+-------------------------------------------------+
----

The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema.
The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema.

=== Reprocess DLQ records

You can apply a transformation and reprocess the record in your data lakehouse to the original Iceberg table. In this case, you have a JSON value represented as a UTF-8 binary. Depending on your query engine, you might need to decode the binary value first before extracting the JSON fields. Some engines may automatically decode the binary value for you:

Expand Down Expand Up @@ -387,6 +388,15 @@ FROM (

You can now insert the transformed record back into the main Iceberg table. Redpanda recommends employing a strategy for exactly-once processing to avoid duplicates when reprocessing records.

=== Drop invalid records

ifndef::env-cloud[]
To disable the default behavior and drop an invalid record, set the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-invalid-record-action[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
endif::[]
ifdef::env-cloud[]
To disable the default behavior and drop an invalid record, set the `redpanda.iceberg.invalid.record.action` topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property.
endif::[]

== Performance considerations

When you enable Iceberg for any substantial workload and start translating topic data to the Iceberg format, you may see most of your cluster's CPU utilization increase. If this additional workload overwhelms the brokers and causes the Iceberg table lag to exceed the configured target lag, Redpanda automatically applies backpressure to producers to prevent Iceberg tables from lagging further. This ensures that Iceberg tables keep up with the volume of incoming data, but sacrifices ingress throughput of the cluster.
Expand Down
4 changes: 2 additions & 2 deletions modules/manage/partials/iceberg/query-iceberg-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ endif::[]

=== Topic with schema (`value_schema_id_prefix` mode)

NOTE: The steps in this section also apply to the `value_schema_latest` mode, except for step 2. The `value_schema_latest` mode doesn't require the Schema Registry wire format, so you'll use your own producer code instead of xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`].
NOTE: The steps in this section also apply to the `value_schema_latest` mode, except the produce step. The `value_schema_latest` mode is not compatible with the Schema Registry wire format. The xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`] command embeds the wire format header, so you must use your own producer code with `value_schema_latest`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r-vasquez I think it would be really great if you could produce NOT using the wire format always. A lot of our customers don't use it, so having be a mode in your profile where you can choose how RPK produces (and maybe consumes) would help customers a lot who don't use the wire format.


Assume that you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`:

Expand Down Expand Up @@ -139,7 +139,7 @@ In this example, assume that you have created the `ClickEvent_key_value` topic,
+
[,bash]
----
echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'
echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'
----

. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value:
Expand Down
2 changes: 1 addition & 1 deletion modules/reference/pages/properties/cluster-properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,7 @@ endif::[]

**Related topics**:

- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue]
- xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors]

---

Expand Down
2 changes: 1 addition & 1 deletion modules/reference/pages/properties/topic-properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ Whether to write invalid records to a dead-letter queue (DLQ).

**Related topics**:

- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue]
- xref:manage:iceberg/about-iceberg-topics.adoc#troubleshoot-errors[Troubleshoot errors]

---

Expand Down