Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 299 additions & 0 deletions docs/integrate/etl/iceberg-risingwave.md
Copy link
Member

@amotl amotl Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend to use the new layout docs/integrate/risingwave/iceberg.md, but I can also apply the refactoring later.

Copy link
Member

@amotl amotl Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new layout regime, docs/integrate/risingwave/index.md would then include a little introduction to RisingWive itself, like the product cards previously presented at https://cratedb.com/docs/crate/clients-tools/en/latest/integrate/etl.html ff., then guiding to individual tutorials like yours, slotted into the physical tree right next to it.

As I've mentioned above, we can easily do it on a subsequent iteration.

/cc @bmunkholm

Copy link
Member

@amotl amotl Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I am already working on a patch to support your document, so you don't need to take care about this task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the quick patch to add an index page.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can easily adjust yours to follow that logic, then squash and merge?
Or do you want to continue working on the document?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hlcianfagna: Please merge the patch at your disposal to bring it in. I can add the relocation of the document later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amotl so you will update /integrate/risingwave/index.html later or should I patch it here?

Copy link
Member

@amotl amotl Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also do it any time, please go ahead. Today, I will only be at the keyboard later.

Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
(iceberg-risingwave)=

# Stream processing from Iceberg tables to CrateDB using RisingWave

[RisingWave] is a stream processing platform that allows configuring data
sources, views on that data, and destinations where results are materialized.

This guide aims to show you an example with data coming from an Iceberg table
and aggregations materialized in real-time in CrateDB.

## Environment setup

For this example, we will spin up 3 containers using [Podman]

Let's first start a [Minio] instance:

```bash
podman run -d --name minio -p 9000:9000 -p 9001:9001 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
quay.io/minio/minio server /data --console-address ":9001"
```

Now let's create a bucket called `warehouse`, for this point a browser to
`http://localhost:9001`, log in with `minioadmin`/`minioadmin`, click
"Create bucket", name it `warehouse`, and click again on "Create bucket".

Then we will spin up an instance of RisingWave:

```bash
podman run -d --name risingwave -it -p 4566:4566 -p 5691:5691 docker.io/risingwavelabs/risingwave:v2.4.0 single_node
```

And finally, an instance of CrateDB:

```bash
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use latest here?

Suggested change
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:latest -Cdiscovery.type=single-node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we usually do? the idea here was to have a version that I know worked, but we are not doing anything special we expect to break in a future version.

Copy link
Member

@amotl amotl Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in some tutorials we use conservatively latest, and on others even nightly. We get confidence out of it by accompanying corresponding tutorials with software tests that run daily.

Another medium-conservative option would be to refer to a minor release (skipping to address patch releases) on the GA channel, e.g. crate:5.10.

```

We will need three terminals for this demonstration.

On the first terminal, we will use [PyIceberg] and [IPython] to create an Iceberg
table, and later we will add data and see how aggregations materialize in
CrateDB in real-time.

On the second terminal, we will do the RisingWave and CrateDB setups, and we will
leave a Python script running for the streaming of changes.

And on the third terminal, we will review how data appears in CrateDB.

## Creating an Iceberg table

Let's start on the first terminal. We use a Python script to create an Iceberg
table on the bucket we created earlier on Minio, and as we want to keep things
simple, we will use an ephemeral in-memory catalog.

```bash
pip install pyiceberg pyarrow s3fs
ipython3
```

```python
from datetime import datetime

import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, LongType, NestedField, TimestampType

catalog = SqlCatalog(
"default",
**{
"uri": "sqlite:///:memory:",
"warehouse": f"s3://warehouse",
"s3.endpoint": "http://localhost:9000",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
},
)

schema = Schema(
NestedField(field_id=1, name="sensor_id", field_type=LongType()),
NestedField(field_id=2, name="ts", field_type=TimestampType()),
NestedField(field_id=3, name="reading", field_type=DoubleType()),
)

catalog.create_namespace("db")

table = catalog.create_table(
identifier="db.sensors_readings", schema=schema, properties={"format-version": "2"}
)


def create_risingwave_compatible_metadata(table, version):
metadata_location = table.metadata_location
metadata_dir = metadata_location.rsplit("/", 1)[0]
version_hint_path = f"{metadata_dir}/version-hint.text"
output_file = table.io.new_output(version_hint_path)
with output_file.create(overwrite=True) as f:
f.write(version.encode("utf-8"))
v1_metadata_path = f"{metadata_dir}/v{version}.metadata.json"
input_file = table.io.new_input(metadata_location)
with input_file.open() as f_in:
content = f_in.read()
output_file = table.io.new_output(v1_metadata_path)
with output_file.create() as f_out:
f_out.write(content)
Comment on lines +95 to +108
Copy link
Member

@amotl amotl Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how this works if the table is physically on S3, but of course it's just optional FYI (for my information), and not a blocker question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relates with apache/iceberg-python#763 and I believe it would work also with real S3

Copy link
Member

@amotl amotl Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] when ingesting iceberg tables without a catalog. An iceberg table can thus be "packaged" as a directory.

That's super interesting, as I also had questions about this particular detail while working on crate/cratedb-toolkit#444. Thanks!

For this context, I am also keen on distributing Iceberg table dumps without a catalog, because it's not necessary for relevant use cases.

Copy link
Member

@amotl amotl Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About my question in this very context, I am only just now seeing that the file handles are managed by the table API. So, it is plausible that this will physically write to the remote filesystem, which answers my question how this would be supposed to work. Thanks!

input_file = table.io.new_input(metadata_location)
output_file = table.io.new_output(v1_metadata_path)



create_risingwave_compatible_metadata(table, "1")
```

## RisingWave and CrateDB setups

Let's now switch to the second terminal.

To interact with both CrateDB and RisingWave we will use the `psql` command line
utility, let's install it:

```bash
sudo apt-get install -y postgresql-client
```

Now let's connect first to CrateDB to create a table where we will keep the
average reading for each sensor:

```bash
psql -h localhost -U crate
```

```sql
CREATE TABLE public.average_sensor_readings (
sensor_id BIGINT PRIMARY KEY,
average_reading DOUBLE
);
```

Ctrl+D

Now we need to tell RisingWave to source the data from the Iceberg table:

```bash
psql -h localhost -p 4566 -d dev -U root
```

```sql
CREATE SOURCE sensors_readings
WITH (
connector = 'iceberg',
database.name='db.db',
warehouse.path='s3://warehouse/',
table.name='sensors_readings',
s3.endpoint = 'http://host.containers.internal:9000',
s3.access.key = 'minioadmin',
s3.secret.key = 'minioadmin',
s3.region = 'minio'
);
```

And to materialize the averages:

```sql
CREATE MATERIALIZED VIEW average_sensor_readings AS
SELECT
sensor_id,
AVG(reading) AS average_reading
FROM sensors_readings
GROUP BY sensor_id;
```

Ctrl+D

And we will now install some dependencies:

```bash
pip install pandas records sqlalchemy-cratedb
pip install psycopg2-binary
pip install --upgrade packaging
pip install --upgrade 'risingwave-py @ git+https://github.com/risingwavelabs/risingwave-py.git@833ca13041cb73cd96fa5cb1c898db2a558d5d8c'
```

And kick off the Python script that will keep CrateDB up-to-date in real-time:

```bash
cat <<EOF >>cratedb_event_handler.py
```

```python
import threading

import pandas as pd
import records
from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions

rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host="localhost", port=4566, user="root", password="root", database="dev"
)
)


def cratedb_event_handler(event: pd.DataFrame):
cratedb = records.Database("crate://", echo=True)
for _, row in event.iterrows():
if row["op"] == "Insert" or row["op"] == "UpdateInsert":
cratedb.query(
"INSERT INTO public.average_sensor_readings (sensor_id,average_reading) VALUES (:sensor_id,:average_reading);",
**dict(
sensor_id=row["sensor_id"],
average_reading=row["average_reading"],
),
)
if row["op"] == "Delete" or row["op"] == "UpdateDelete":
cratedb.query(
"DELETE FROM public.average_sensor_readings WHERE sensor_id=:sensor_id;",
**dict(sensor_id=row["sensor_id"]),
)


def subscribe_average_sensor_readings_change():
rw.on_change(
subscribe_from="average_sensor_readings",
handler=cratedb_event_handler,
output_format=OutputFormat.DATAFRAME,
)


threading.Thread(target=subscribe_average_sensor_readings_change).start()
```

```bash
EOF

python cratedb_event_handler.py
```
Comment on lines +185 to +236
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Broken nested code fences in shell snippet—refactor into valid Markdown.
The sequence of fenced blocks around cat <<EOF and the embedded Python snippet will not render properly and duplicates content. Consolidate into a single bash block (or split into a Python preview block and a bash creation block) with proper heredoc quoting and overwrite redirection (>).

Example one-block solution:

-def k <<EOF >>cratedb_event_handler.py
-```python
-import threading
-...
-threading.Thread(target=subscribe_average_sensor_readings_change).start()
-```
-```bash
-EOF
-python cratedb_event_handler.py
-```
+```bash
+cat << 'EOF' > cratedb_event_handler.py
+#!/usr/bin/env python3
+import threading
+import pandas as pd
+import records
+from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions
+
+rw = RisingWave(
+    RisingWaveConnOptions.from_connection_info(
+        host="localhost", port=4566, user="root", password="root", database="dev"
+    )
+)
+
+def cratedb_event_handler(event: pd.DataFrame):
+    cratedb = records.Database("crate://", echo=True)
+    for _, row in event.iterrows():
+        if row["op"] in ("Insert", "UpdateInsert"):
+            cratedb.query(
+                "INSERT INTO public.average_sensor_readings (sensor_id, average_reading) VALUES (:sensor_id, :average_reading);",
+                sensor_id=row["sensor_id"],
+                average_reading=row["average_reading"]
+            )
+        elif row["op"] in ("Delete", "UpdateDelete"):
+            cratedb.query(
+                "DELETE FROM public.average_sensor_readings WHERE sensor_id = :sensor_id;",
+                sensor_id=row["sensor_id"]
+            )
+
+def subscribe_average_sensor_readings_change():
+    rw.on_change(
+        subscribe_from="average_sensor_readings",
+        handler=cratedb_event_handler,
+        output_format=OutputFormat.DATAFRAME,
+    )
+
+threading.Thread(target=subscribe_average_sensor_readings_change, daemon=True).start()
+EOF
+
+python3 cratedb_event_handler.py
+```

<details>
<summary>🤖 Prompt for AI Agents</summary>

In docs/integrate/etl/iceberg-risingwave.md around lines 186 to 237, the nested
fenced code blocks within the shell snippet using cat <<EOF and embedded Python
cause rendering issues and duplicate content. Refactor by consolidating into a
single bash fenced block that uses a properly quoted heredoc (e.g., cat << 'EOF'

filename) with overwrite redirection, include the full Python script inside
it, and close the heredoc before running the python command. Also, add a shebang
line to the Python script, use 'in' for op checks, and mark the thread as daemon
for clean exit.


</details>

<!-- This is an auto-generated comment by CodeRabbit -->


## Adding some data and seeing results materialize in real-time

Let's go back to the first terminal and run:

```python
data = pa.Table.from_pydict(
{
"sensor_id": [1, 1],
"ts": [
datetime.strptime("2025-05-14 14:00", "%Y-%m-%d %H:%M"),
datetime.strptime("2025-05-14 15:00", "%Y-%m-%d %H:%M"),
],
"reading": [1.2, 3.4],
}
)

table.append(data)

create_risingwave_compatible_metadata(table, "2")
```

Now let's go to the third terminal. Let connect to CrateDB:

```bash
psql -h localhost -U crate
```

And let's inspect the average_sensor_readings table:

```sql
SELECT * FROM public.average_sensor_readings;
```

The average for sensor 1 is 2.3

Let's go back to the first terminal and run:

```python
data = pa.Table.from_pydict(
{
"sensor_id": [1, 1],
"ts": [
datetime.strptime("2025-06-04 14:00", "%Y-%m-%d %H:%M"),
datetime.strptime("2025-06-04 15:00", "%Y-%m-%d %H:%M"),
],
"reading": [5.6, 7.8],
}
)

table.append(data)

create_risingwave_compatible_metadata(table, "3")
```

If we now check `average_sensor_readings` from the third terminal, we will see
that the average has already changed to 4.5.

[RisingWave]: https://github.com/risingwavelabs/risingwave
[Podman]: https://github.com/containers/podman
[Minio]: https://github.com/minio/minio
[PyIceberg]: https://github.com/apache/iceberg-python
[IPython]: https://github.com/ipython/ipython
12 changes: 12 additions & 0 deletions docs/integrate/etl/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ Tutorials and resources about configuring the managed variants, Astro and CrateD
- [CrateDB Apache Hop Bulk Loader transform]


## Apache Iceberg / RisingWave
:::{div}
- {ref}`iceberg-risingwave`
:::

```{toctree}
:hidden:

iceberg-risingwave
```


## Apache Kafka
:::{div}
- {ref}`kafka-connect`
Expand Down
2 changes: 1 addition & 1 deletion docs/integrate/risingwave/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ referenced below.
:::
- An example with data coming from an Apache Iceberg table and aggregations
materialized in real-time in CrateDB, using RisingWave.
See [Stream processing from Iceberg tables to CrateDB using RisingWave].
See {ref}`iceberg-risingwave`.

:::{note}
We are tracking interoperability issues per [Tool: RisingWave] and appreciate
Expand Down