-
Notifications
You must be signed in to change notification settings - Fork 1
RisingWave: Tutorial about Apache Iceberg -> CrateDB #207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Warning Rate limit exceeded@hlcianfagna has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 2 minutes and 51 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughA new documentation guide was added detailing how to set up a streaming ETL pipeline from Apache Iceberg tables to CrateDB using RisingWave. The integration index was updated with a link to this guide. The new document provides step-by-step instructions, code snippets, and commands for the complete setup and real-time data flow. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Minio
participant PyIceberg
participant RisingWave
participant CrateDB
participant PythonScript
User->>Minio: Start Minio container
User->>RisingWave: Start RisingWave container
User->>CrateDB: Start CrateDB container
User->>PyIceberg: Create Iceberg table (writes to Minio)
User->>RisingWave: Create source for Iceberg table
User->>RisingWave: Create materialized view (average readings)
User->>CrateDB: Create target table
PythonScript->>RisingWave: Subscribe to materialized view changes
PythonScript->>CrateDB: Apply inserts/deletes in real-time
User->>PyIceberg: Add data to Iceberg table
PyIceberg->>Minio: Write new data
User->>RisingWave: Update metadata version
RisingWave->>PythonScript: Notify of new data
PythonScript->>CrateDB: Update aggregated results
User->>CrateDB: Query for real-time results
Poem
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
docs/integrate/etl/iceberg-risingwave.md (2)
10-13
: Add comma after introductory phrase.For readability, insert a comma after the introductory phrase:
-For this example we will spin up 3 containers using [Podman] +For this example, we will spin up 3 containers using [Podman]
211-217
: Rename subscription function for consistency.The function
subscribe_average_exam_scores_change
deals with sensor readings. Rename it to accurately reflect its purpose:-def subscribe_average_exam_scores_change(): +def subscribe_average_sensor_readings_change():
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)docs/integrate/etl/index.md
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[typographical] ~9-~9: Use a comma after an introductory phrase.
Context: ...time in CrateDB. ## Environment setup For this example we will spin up 3 containers using [Pod...
(COMMA_INTRODUCTORY_WORDS_PHRASES)
[uncategorized] ~29-~29: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~39-~39: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~41-~41: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~283-~283: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
21-21: Bare URL used
null
(MD034, no-bare-urls)
124-124: Hard tabs
Column: 1
(MD010, no-hard-tabs)
125-125: Hard tabs
Column: 1
(MD010, no-hard-tabs)
142-142: Hard tabs
Column: 1
(MD010, no-hard-tabs)
144-144: Hard tabs
Column: 1
(MD010, no-hard-tabs)
147-147: Hard tabs
Column: 1
(MD010, no-hard-tabs)
150-150: Hard tabs
Column: 1
(MD010, no-hard-tabs)
🪛 GitHub Actions: docs
docs/integrate/etl/iceberg-risingwave.md
[warning] 1-1: Document isn't included in any toctree, which may cause it to be omitted from the documentation build.
🔇 Additional comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
193-196
: Clarify CrateDB connection string.Using
records.Database("crate://", ...)
may be ambiguous. Specify host and port explicitly to avoid connection errors:-def cratedb = records.Database("crate://", echo=True) +def cratedb = records.Database("crate://localhost:5432", echo=True)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (9)
docs/integrate/etl/iceberg-risingwave.md (9)
21-21
: Avoid bare URL and improve step clarity
For step-by-step instructions, wrap the URL in backticks or create a markdown link label. Also rephrase for clarity:- And let's create a bucket called `warehouse`, for this point a browser to http://localhost:9001 , login with `minioadmin` / `minioadmin` , and click on "Create bucket", enter "warehouse", and click again on "Create bucket". + And let's create a bucket called `warehouse`. For this, point your browser to [Minio Console](http://localhost:9001), log in with `minioadmin`/`minioadmin`, click **Create bucket**, enter `warehouse`, and confirm.🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
21-21: Bare URL used
null(MD034, no-bare-urls)
29-29
: Add missing comma after 'finally'
Insert a comma for readability:- And finally an instance of CrateDB: + And finally, an instance of CrateDB:🧰 Tools
🪛 LanguageTool
[uncategorized] ~29-~29: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_nodeAnd finally an instance of CrateDB:
bash podman...(AI_HYDRA_LEO_MISSING_COMMA)
39-39
: Add comma after introductory phrase- On the second console we will do the RisingWave and CrateDB setups, and we will leave a Python script running for the streaming of changes. + On the second console, we will do the RisingWave and CrateDB setups, and we will leave a Python script running for the streaming of changes.🧰 Tools
🪛 LanguageTool
[uncategorized] ~39-~39: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...(AI_HYDRA_LEO_MISSING_COMMA)
41-41
: Add comma after introductory clause- And on the 3rd console we will review how data appears in CrateDB. + And on the 3rd console, we will review how data appears in CrateDB.🧰 Tools
🪛 LanguageTool
[uncategorized] ~41-~41: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...(AI_HYDRA_LEO_MISSING_COMMA)
283-283
: Improve readability with comma- If now we check again average_sensor_readings from the 3rd console we will see the average has already changed to 4.5 + If we now check `average_sensor_readings` from the 3rd console, we will see the average has already changed to 4.5.🧰 Tools
🪛 LanguageTool
[uncategorized] ~283-~283: A comma might be missing here.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...(AI_EN_LECTOR_MISSING_PUNCTUATION_COMMA)
123-127
: Replace hard tabs with spaces in SQL code block
Convert leading tabs to spaces for consistency:```sql -CREATE TABLE public.average_sensor_readings ( - sensor_id BIGINT PRIMARY KEY, - average_reading DOUBLE -); +CREATE TABLE public.average_sensor_readings ( + sensor_id BIGINT PRIMARY KEY, + average_reading DOUBLE +);<details> <summary>🧰 Tools</summary> <details> <summary>🪛 markdownlint-cli2 (0.17.2)</summary> 124-124: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 125-125: Hard tabs Column: 1 (MD010, no-hard-tabs) </details> </details> --- `142-150`: **Replace hard tabs with spaces in SQL code block** Ensure consistent indentation using spaces: ```diff ```sql - CREATE SOURCE sensors_readings - WITH ( - connector = 'iceberg', - database.name='db.db', - warehouse.path='s3://warehouse/', - table.name='sensors_readings', - s3.endpoint = 'http://host.containers.internal:9000', - s3.access.key = 'minioadmin', - s3.secret.key = 'minioadmin', - s3.region = 'minio' - ); +CREATE SOURCE sensors_readings +WITH ( + connector = 'iceberg', + database.name = 'db.db', + warehouse.path = 's3://warehouse/', + table.name = 'sensors_readings', + s3.endpoint = 'http://host.containers.internal:9000', + s3.access.key = 'minioadmin', + s3.secret.key = 'minioadmin', + s3.region = 'minio' +);
<details> <summary>🧰 Tools</summary> <details> <summary>🪛 markdownlint-cli2 (0.17.2)</summary> 142-142: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 144-144: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 147-147: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 150-150: Hard tabs Column: 1 (MD010, no-hard-tabs) </details> </details> --- `53-55`: **Remove unused import** The `re` module is not used in this script. Remove the import to clean up dependencies: ```diff - import re
193-196
: Specify full CrateDB connection string
Usingrecords.Database("crate://", echo=True)
relies on defaults. For clarity and reliability, include host, port, and database name:- cratedb = records.Database("crate://", echo=True) + cratedb = records.Database("crate://localhost:4200/your_db?user=crate&password=<pass>", echo=True)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~29-~29: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~39-~39: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~41-~41: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~283-~283: A comma might be missing here.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_EN_LECTOR_MISSING_PUNCTUATION_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
21-21: Bare URL used
null
(MD034, no-bare-urls)
124-124: Hard tabs
Column: 1
(MD010, no-hard-tabs)
125-125: Hard tabs
Column: 1
(MD010, no-hard-tabs)
142-142: Hard tabs
Column: 1
(MD010, no-hard-tabs)
144-144: Hard tabs
Column: 1
(MD010, no-hard-tabs)
147-147: Hard tabs
Column: 1
(MD010, no-hard-tabs)
150-150: Hard tabs
Column: 1
(MD010, no-hard-tabs)
🔇 Additional comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
138-142
: Verify Iceberg connector property names
Thedatabase.name='db.db'
setting may be incorrect—typically the property should match your Iceberg namespace (e.g.,db
). Please confirm the RisingWave Iceberg connector keys/values against the official docs.🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
142-142: Hard tabs
Column: 1(MD010, no-hard-tabs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
187-235
: Fix nested code fences around heredoc and Python script
Nested code fences inside a bash heredoc break proper rendering and copying. Consolidate into a single bash block using a proper heredoc.--- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -184,54 +184,24 @@ And kick off the Python script that will keep CrateDB up-to-date in real-time: -```bash -cat <<EOF >>cratedb_event_handler.py -``` - -```python -import threading - -import pandas as pd -import records -from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions - -rw = RisingWave( - RisingWaveConnOptions.from_connection_info( - host="localhost", port=4566, user="root", password="root", database="dev" - ) -) - - -def cratedb_event_handler(event: pd.DataFrame): - cratedb = records.Database("crate://", echo=True) - for _, row in event.iterrows(): - if row["op"] == "Insert" or row["op"] == "UpdateInsert": - cratedb.query( - "INSERT INTO public.average_sensor_readings (sensor_id,average_reading) VALUES (:sensor_id,:average_reading);", - **dict( - sensor_id=row["sensor_id"], - average_reading=row["average_reading"], - ), - ) - if row["op"] == "Delete" or row["op"] == "UpdateDelete": - cratedb.query( - "DELETE FROM public.average_sensor_readings WHERE sensor_id=:sensor_id;", - **dict(sensor_id=row["sensor_id"]), - ) - - - -def subscribe_average_sensor_readings_change(): - rw.on_change( - subscribe_from="average_sensor_readings", - handler=cratedb_event_handler, - output_format=OutputFormat.DATAFRAME, - ) - - - -threading.Thread(target=subscribe_average_sensor_readings_change).start() -``` - -```bash -EOF - -python cratedb_event_handler.py -``` +```bash +cat <<'EOF' > cratedb_event_handler.py +#!/usr/bin/env python3 +import threading +import pandas as pd +import records +from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions + +rw = RisingWave( + RisingWaveConnOptions.from_connection_info( + host="localhost", port=4566, user="root", password="root", database="dev" + ) +) + +def cratedb_event_handler(event: pd.DataFrame): + db = records.Database( + "crate://localhost:4200/your_db?user=crate&password=<pass>", echo=True + ) + for _, row in event.iterrows(): + if row["op"] in ("Insert", "UpdateInsert"): + db.query( + "INSERT INTO public.average_sensor_readings (sensor_id, average_reading) VALUES (:sensor_id, :average_reading);", + **{"sensor_id": row["sensor_id"], "average_reading": row["average_reading"]}, + ) + elif row["op"] in ("Delete", "UpdateDelete"): + db.query( + "DELETE FROM public.average_sensor_readings WHERE sensor_id = :sensor_id;", + **{"sensor_id": row["sensor_id"]}, + ) + +def subscribe_average_sensor_readings_change(): + rw.on_change( + subscribe_from="average_sensor_readings", + handler=cratedb_event_handler, + output_format=OutputFormat.DATAFRAME, + ) + +threading.Thread(target=subscribe_average_sensor_readings_change).start() + +EOF +python3 cratedb_event_handler.py +```
🧹 Nitpick comments (3)
docs/integrate/etl/iceberg-risingwave.md (3)
23-27
: Refine grammar and embed bare URL as markdown link
Minor typos and bare URL usage reduce readability. Replace "for this point a browser to http://localhost:9001" with "open a browser at Minio console".🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
25-25: Bare URL used
null(MD034, no-bare-urls)
45-49
: Add missing commas for readability
Add commas after introductory clauses to improve flow (e.g., before "On the second console..." and "And on the 3rd console...").🧰 Tools
🪛 LanguageTool
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...(AI_HYDRA_LEO_MISSING_COMMA)
135-138
: Replace hard tabs with spaces in SQL blocks
Hard tabs can break markdown formatting across platforms. Use spaces for indentation in the SQL code fences forCREATE TABLE
,CREATE SOURCE
, and related snippets.Also applies to: 149-159
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
135-135: Hard tabs
Column: 1(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1(MD010, no-hard-tabs)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)docs/integrate/etl/index.md
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/integrate/etl/index.md
🧰 Additional context used
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
25-25: Bare URL used
null
(MD034, no-bare-urls)
135-135: Hard tabs
Column: 1
(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1
(MD010, no-hard-tabs)
153-153: Hard tabs
Column: 1
(MD010, no-hard-tabs)
155-155: Hard tabs
Column: 1
(MD010, no-hard-tabs)
158-158: Hard tabs
Column: 1
(MD010, no-hard-tabs)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build docs
🔇 Additional comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
1-10
: Overall: Comprehensive and clear tutorial
The integration guide is well-structured, covers environment setup, Iceberg schema creation, RisingWave configuration, and real-time CrateDB updates thoroughly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent, thanks a stack. I particularly love it is a standalone end-to-end example from the very beginning, including Minio.
NB: I am thinking about if it would be a good idea to bring the code into an executable form? Let me know if you also would like to have it, but don't have the capacities.
Edit: This will be approached on a later iteration, see GH-209 ff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend to use the new layout docs/integrate/risingwave/iceberg.md
, but I can also apply the refactoring later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new layout regime, docs/integrate/risingwave/index.md
would then include a little introduction to RisingWive itself, like the product cards previously presented at https://cratedb.com/docs/crate/clients-tools/en/latest/integrate/etl.html ff., then guiding to individual tutorials like yours, slotted into the physical tree right next to it.
As I've mentioned above, we can easily do it on a subsequent iteration.
/cc @bmunkholm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: I am already working on a patch to support your document, so you don't need to take care about this task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the quick patch to add an index page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can easily adjust yours to follow that logic, then squash and merge?
Or do you want to continue working on the document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hlcianfagna: Please merge the patch at your disposal to bring it in. I can add the relocation of the document later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amotl so you will update /integrate/risingwave/index.html
later or should I patch it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also do it any time, please go ahead. Today, I will only be at the keyboard later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
docs/integrate/etl/iceberg-risingwave.md (4)
135-136
: Replace hard tabs with spaces in SQL snippets
Markdown lint flags hard tabs here. Convert to spaces (e.g., 2 or 4 spaces) for indentation consistency:- sensor_id BIGINT PRIMARY KEY, - average_reading DOUBLE + sensor_id BIGINT PRIMARY KEY, + average_reading DOUBLEand similarly in the
CREATE SOURCE
block.Also applies to: 153-160
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
135-135: Hard tabs
Column: 1(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1(MD010, no-hard-tabs)
24-27
: Improve grammar for bucket creation instruction
The phrase “for this point a browser to” is unclear. E.g., change to:And let's create a bucket called `warehouse`: open your browser at `http://localhost:9001`, log in with `minioadmin`/`minioadmin`, then click "Create bucket", enter "warehouse", and confirm.🧰 Tools
🪛 LanguageTool
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket calledwarehouse
, for this point a browser to `http://localho...(AI_HYDRA_LEO_REPLACE_FOR_AT)
40-50
: Streamline console terminology and punctuation
- Spell out “three terminals” instead of “3 consoles”.
- Add missing commas for clarity:
-We will need 3 consoles for this demonstration. +We will need three terminals for this demonstration. -On the first console we will use [PyIceberg] and [IPython] to create an Iceberg -table, and later we will add data and see how aggregations materialize in -CrateDB in real-time. +On the first terminal, we use [PyIceberg] and [IPython] to create an Iceberg +table, and later we will add data and see how aggregations materialize in +CrateDB in real time. -On the second console we will do the RisingWave and CrateDB setups, and we will -leave a Python script running for the streaming of changes. +On the second terminal, we perform the RisingWave and CrateDB setups, and leave +a Python script running for streaming changes. -And on the 3rd console we will review how data appears in CrateDB. +On the third terminal, we review how data appears in CrateDB.🧰 Tools
🪛 LanguageTool
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...(AI_HYDRA_LEO_MISSING_COMMA)
293-295
: Clarify final result sentence
Add an article and comma for readability:-If now we check again average_sensor_readings from the 3rd console we will see -the average has already changed to 4.5 +If we now check `average_sensor_readings` from the third terminal, we will see +that the average has already changed to 4.5.🧰 Tools
🪛 LanguageTool
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...(AI_HYDRA_LEO_MISSING_COMMA)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket called warehouse
, for this point a browser to `http://localho...
(AI_HYDRA_LEO_REPLACE_FOR_AT)
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
135-135: Hard tabs
Column: 1
(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1
(MD010, no-hard-tabs)
153-153: Hard tabs
Column: 1
(MD010, no-hard-tabs)
155-155: Hard tabs
Column: 1
(MD010, no-hard-tabs)
158-158: Hard tabs
Column: 1
(MD010, no-hard-tabs)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build docs
🔇 Additional comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
152-152
: Verify thedatabase.name
value in the Iceberg connector
The example usesdatabase.name='db.db'
, but it may need to match your Iceberg namespace (likely just'db'
). Ensure this maps correctly to your catalog and path.
This comment was marked as resolved.
This comment was marked as resolved.
7484e80
to
bf0d765
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
187-237
: 🛠️ Refactor suggestionConsolidate script generation into a single bash block.
This snippet uses nested fenced blocks and the append operator (
>>
), which breaks rendering and duplicates file content on repeated runs. Refactor into onebash
block using>
to overwrite the script, include a#!/usr/bin/env python3
shebang, combine the Python code inline, simplifyif
checks within
, and adddaemon=True
to the thread.
Example diff:```bash -cat <<EOF >> cratedb_event_handler.py -``` -```python +cat <<'EOF' > cratedb_event_handler.py +#!/usr/bin/env python3 +import threading +import pandas as pd +import records +from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions + +rw = RisingWave(RisingWaveConnOptions.from_connection_info( + host="localhost", port=4566, user="root", password="root", database="dev" +)) + +def cratedb_event_handler(event: pd.DataFrame): + db = records.Database("crate://", echo=True) + for _, row in event.iterrows(): + if row["op"] in ("Insert", "UpdateInsert"): + db.query( + "INSERT INTO public.average_sensor_readings (sensor_id, average_reading) VALUES (:sensor_id, :average_reading);", + sensor_id=row["sensor_id"], average_reading=row["average_reading"] + ) + elif row["op"] in ("Delete", "UpdateDelete"): + db.query( + "DELETE FROM public.average_sensor_readings WHERE sensor_id = :sensor_id;", + sensor_id=row["sensor_id"] + ) + +def subscribe_average_sensor_readings_change(): + rw.on_change( + subscribe_from="average_sensor_readings", + handler=cratedb_event_handler, + output_format=OutputFormat.DATAFRAME, + ) + +threading.Thread(target=subscribe_average_sensor_readings_change, daemon=True).start() +EOF +python3 cratedb_event_handler.py</blockquote></details> </blockquote></details> <details> <summary>🧹 Nitpick comments (3)</summary><blockquote> <details> <summary>docs/integrate/etl/iceberg-risingwave.md (3)</summary><blockquote> `24-27`: **Refine instruction clarity and grammar.** The phrase “for this point a browser to” is awkward and missing a comma. Consider rephrasing to: “At this point, open a browser to `http://localhost:9001`, log in with `minioadmin`/`minioadmin`, and click “Create bucket.” <details> <summary>🧰 Tools</summary> <details> <summary>🪛 LanguageTool</summary> [uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position. Context: ...t's create a bucket called `warehouse`, for this point a browser to `http://localho... (AI_HYDRA_LEO_REPLACE_FOR_AT) </details> </details> --- `24-49`: **General nit: add commas after introductory phrases.** Several sentences begin with “And” without a comma after the introductory phrase (“And finally an instance…”, “And on the 3rd console…”). For consistency and readability, insert commas after “And” or restructure to avoid repeated conjunctions. <details> <summary>🧰 Tools</summary> <details> <summary>🪛 LanguageTool</summary> [uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position. Context: ...t's create a bucket called `warehouse`, for this point a browser to `http://localho... (AI_HYDRA_LEO_REPLACE_FOR_AT) --- [uncategorized] ~34-~34: Possible missing comma found. Context: .../risingwave:v2.4.0 single_node ``` And finally an instance of CrateDB: ```bash podman... (AI_HYDRA_LEO_MISSING_COMMA) --- [uncategorized] ~46-~46: Possible missing comma found. Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s... (AI_HYDRA_LEO_MISSING_COMMA) --- [uncategorized] ~49-~49: Possible missing comma found. Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat... (AI_HYDRA_LEO_MISSING_COMMA) </details> </details> --- `135-138`: **Replace hard tabs with spaces in SQL block.** The CREATE TABLE snippet uses hard tabs for indentation, which violates MD010. Convert tabs to spaces to ensure consistent formatting and proper lint compliance. <details> <summary>🧰 Tools</summary> <details> <summary>🪛 markdownlint-cli2 (0.17.2)</summary> 135-135: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 136-136: Hard tabs Column: 1 (MD010, no-hard-tabs) </details> </details> </blockquote></details> </blockquote></details> <details> <summary>📜 Review details</summary> **Configuration used: CodeRabbit UI** **Review profile: CHILL** **Plan: Pro** <details> <summary>📥 Commits</summary> Reviewing files that changed from the base of the PR and between 7484e80e494948990ed832445576c3fa3d86a646 and bf0d7652dc0e1393886394702350188237cb530d. </details> <details> <summary>📒 Files selected for processing (2)</summary> * `docs/integrate/etl/iceberg-risingwave.md` (1 hunks) * `docs/integrate/etl/index.md` (1 hunks) </details> <details> <summary>🚧 Files skipped from review as they are similar to previous changes (1)</summary> * docs/integrate/etl/index.md </details> <details> <summary>🧰 Additional context used</summary> <details> <summary>🧠 Learnings (1)</summary> <details> <summary>docs/integrate/etl/iceberg-risingwave.md (1)</summary>
Learnt from: amotl
PR: #207
File: docs/integrate/etl/iceberg-risingwave.md:205-207
Timestamp: 2025-06-05T14:29:15.512Z
Learning: Therecords.Database("crate://", echo=True)
connection string for CrateDB works with defaults: localhost as host, "crate" as user, and blank password. This is valid and functional code in the records library.</details> </details><details> <summary>🪛 LanguageTool</summary> <details> <summary>docs/integrate/etl/iceberg-risingwave.md</summary> [uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position. Context: ...t's create a bucket called `warehouse`, for this point a browser to `http://localho... (AI_HYDRA_LEO_REPLACE_FOR_AT) --- [uncategorized] ~34-~34: Possible missing comma found. Context: .../risingwave:v2.4.0 single_node ``` And finally an instance of CrateDB: ```bash podman... (AI_HYDRA_LEO_MISSING_COMMA) --- [uncategorized] ~46-~46: Possible missing comma found. Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s... (AI_HYDRA_LEO_MISSING_COMMA) --- [uncategorized] ~49-~49: Possible missing comma found. Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat... (AI_HYDRA_LEO_MISSING_COMMA) --- [uncategorized] ~293-~293: Possible missing comma found. Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha... (AI_HYDRA_LEO_MISSING_COMMA) </details> </details> <details> <summary>🪛 markdownlint-cli2 (0.17.2)</summary> <details> <summary>docs/integrate/etl/iceberg-risingwave.md</summary> 135-135: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 136-136: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 153-153: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 155-155: Hard tabs Column: 1 (MD010, no-hard-tabs) --- 158-158: Hard tabs Column: 1 (MD010, no-hard-tabs) </details> </details> </details> <details> <summary>⏰ Context from checks skipped due to timeout of 90000ms (1)</summary> * GitHub Check: Build docs </details> <details> <summary>🔇 Additional comments (3)</summary><blockquote> <details> <summary>docs/integrate/etl/iceberg-risingwave.md (3)</summary> `1-3`: **Confirm anchor syntax compatibility.** The line `(iceberg-risingwave)=` provides a named anchor, but depending on the Markdown flavor (e.g., MyST for Sphinx), this syntax may vary. Please verify that this anchor renders correctly in Read the Docs. --- `178-182`: **Verify RisingWave Python client dependency.** Locking to a Git commit (`833ca13041cb73cd96fa5cb1c898db2a558d5d8c`) can impede reproducibility if that commit is moved or deleted. Confirm that this commit is stable (e.g., tagged) or consider using a released version. --- `296-301`: **Ensure the integration index is updated.** The PR description indicates that `docs/integrate/etl/index.md` was updated to reference this new guide, but that change isn’t included here. Please verify that the ETL index page includes a link to `iceberg-risingwave.md` so the tutorial is discoverable. </details> </blockquote></details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
bf0d765
to
363ce17
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
docs/integrate/etl/iceberg-risingwave.md (2)
31-33
: Use overwrite (>
) instead of append (>>
) and quote heredoc delimiter
The snippet uses>>
, which will duplicate the file if run multiple times, and an unquotedEOF
allows unwanted variable expansion. Consolidate into one bash block and change to:-cat <<EOF >>cratedb_event_handler.py +cat <<'EOF' > cratedb_event_handler.py
187-237
: Fix nested code fences in shell snippet
The markdown splits the heredoc and Python code into separate fenced blocks, breaking rendering and copy/paste. Merge into a singlebash
fenced block containing both the heredoc (script creation) and thepython3
execution.
🧹 Nitpick comments (4)
docs/integrate/etl/iceberg-risingwave.md (4)
24-27
: Refine grammar and clarity of bucket creation step
The sentence is awkwardly phrased. Consider rewording to improve readability, for example:-And let's create a bucket called `warehouse`, for this point a browser to `http://localhost:9001` , login with `minioadmin` / `minioadmin` , and click on "Create bucket", enter "warehouse", and click again on "Create bucket". +At this point, point your browser to `http://localhost:9001`, log in with `minioadmin`/`minioadmin`, click "Create bucket", name it `warehouse`, and confirm.🧰 Tools
🪛 LanguageTool
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket calledwarehouse
, for this point a browser to `http://localho...(AI_HYDRA_LEO_REPLACE_FOR_AT)
207-214
: Simplifyop
checks and useelif
Combine insert/update cases within
, and useelif
for deletes to make the logic clearer and mutually exclusive:-def cratedb_event_handler(event: pd.DataFrame): - cratedb = records.Database("crate://", echo=True) - for _, row in event.iterrows(): - if row["op"] == "Insert" or row["op"] == "UpdateInsert": - ... - if row["op"] == "Delete" or row["op"] == "UpdateDelete": - ... +def cratedb_event_handler(event: pd.DataFrame): + cratedb = records.Database("crate://", echo=True) + for _, row in event.iterrows(): + if row["op"] in ("Insert", "UpdateInsert"): + ... + elif row["op"] in ("Delete", "UpdateDelete"): + ...
230-231
: Mark thread as daemon
To allow the process to exit cleanly when the main thread finishes, set the thread as a daemon:-threading.Thread(target=subscribe_average_sensor_readings_change).start() +threading.Thread(target=subscribe_average_sensor_readings_change, daemon=True).start()
135-138
: Replace hard tabs with spaces
Markdown lint flags hard tabs in the SQL block. Convert tabs to spaces (e.g., 2 or 4 spaces) for consistency and proper rendering.🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
135-135: Hard tabs
Column: 1(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1(MD010, no-hard-tabs)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)docs/integrate/etl/index.md
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- docs/integrate/etl/index.md
🧰 Additional context used
🧠 Learnings (1)
docs/integrate/etl/iceberg-risingwave.md (1)
Learnt from: amotl
PR: crate/cratedb-guide#207
File: docs/integrate/etl/iceberg-risingwave.md:205-207
Timestamp: 2025-06-05T14:29:15.512Z
Learning: The `records.Database("crate://", echo=True)` connection string for CrateDB works with defaults: localhost as host, "crate" as user, and blank password. This is valid and functional code in the records library.
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket called warehouse
, for this point a browser to `http://localho...
(AI_HYDRA_LEO_REPLACE_FOR_AT)
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
135-135: Hard tabs
Column: 1
(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1
(MD010, no-hard-tabs)
153-153: Hard tabs
Column: 1
(MD010, no-hard-tabs)
155-155: Hard tabs
Column: 1
(MD010, no-hard-tabs)
158-158: Hard tabs
Column: 1
(MD010, no-hard-tabs)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build docs
🔇 Additional comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
205-207
: Therecords.Database("crate://", echo=True)
connection string is valid—it defaults tolocalhost
, usercrate
, and blank password. No change needed.
And finally an instance of CrateDB: | ||
|
||
```bash | ||
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use latest
here?
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node | |
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:latest -Cdiscovery.type=single-node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we usually do? the idea here was to have a version that I know worked, but we are not doing anything special we expect to break in a future version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in some tutorials we use conservatively latest
, and on others even nightly
. We get confidence out of it by accompanying corresponding tutorials with software tests that run daily.
Another medium-conservative option would be to refer to a minor release (skipping to address patch releases) on the GA channel, e.g. crate:5.10
.
def create_risingwave_compatible_metadata(table, version): | ||
metadata_location = table.metadata_location | ||
metadata_dir = metadata_location.rsplit("/", 1)[0] | ||
version_hint_path = f"{metadata_dir}/version-hint.text" | ||
output_file = table.io.new_output(version_hint_path) | ||
with output_file.create(overwrite=True) as f: | ||
f.write(version.encode("utf-8")) | ||
v1_metadata_path = f"{metadata_dir}/v{version}.metadata.json" | ||
input_file = table.io.new_input(metadata_location) | ||
with input_file.open() as f_in: | ||
content = f_in.read() | ||
output_file = table.io.new_output(v1_metadata_path) | ||
with output_file.create() as f_out: | ||
f_out.write(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious how this works if the table is physically on S3, but of course it's just optional FYI (for my information), and not a blocker question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relates with apache/iceberg-python#763 and I believe it would work also with real S3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[...] when ingesting iceberg tables without a catalog. An iceberg table can thus be "packaged" as a directory.
That's super interesting, as I also had questions about this particular detail while working on crate/cratedb-toolkit#444. Thanks!
For this context, I am also keen on distributing Iceberg table dumps without a catalog, because it's not necessary for relevant use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About my question in this very context, I am only just now seeing that the file handles are managed by the table API. So, it is plausible that this will physically write to the remote filesystem, which answers my question how this would be supposed to work. Thanks!
input_file = table.io.new_input(metadata_location)
output_file = table.io.new_output(v1_metadata_path)
Co-authored-by: Andreas Motl <[email protected]>
363ce17
to
33a8462
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (8)
docs/integrate/etl/iceberg-risingwave.md (8)
24-26
: Grammar: Clarify and rephrase bucket creation instruction.
The phrase “for this point a browser to” is awkward and missing a comma.
Suggest:-And let's create a bucket called `warehouse`, for this point a browser to -`http://localhost:9001` , login with `minioadmin` / `minioadmin` , and click on +At this point, open a browser to `http://localhost:9001`, log in with +`minioadmin`/`minioadmin`, click “Create bucket”, enter “warehouse”, and confirm.🧰 Tools
🪛 LanguageTool
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket calledwarehouse
, for this point a browser to `http://localho...(AI_HYDRA_LEO_REPLACE_FOR_AT)
34-38
: Grammar: Add missing comma in introductory clause.
The sentence “And finally an instance of CrateDB:” reads better with a comma.-And finally an instance of CrateDB: +And finally, an instance of CrateDB:🧰 Tools
🪛 LanguageTool
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_nodeAnd finally an instance of CrateDB:
bash podman...(AI_HYDRA_LEO_MISSING_COMMA)
46-47
: Grammar: Insert comma after introductory phrase.
Add a comma after “On the second console” for readability.-On the second console we will do the RisingWave and CrateDB setups, and we will +On the second console, we will do the RisingWave and CrateDB setups, and we will🧰 Tools
🪛 LanguageTool
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...(AI_HYDRA_LEO_MISSING_COMMA)
49-49
: Grammar: Insert comma after introductory phrase.
Add a comma after “And on the 3rd console” for consistency.-And on the 3rd console we will review how data appears in CrateDB. +And on the 3rd console, we will review how data appears in CrateDB.🧰 Tools
🪛 LanguageTool
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...(AI_HYDRA_LEO_MISSING_COMMA)
135-139
: Formatting: Replace hard tabs in SQL code block.
TheCREATE TABLE
block uses hard tabs, which violates markdownlint. Convert to spaces:- CREATE TABLE public.average_sensor_readings ( - sensor_id BIGINT PRIMARY KEY, - average_reading DOUBLE -); + CREATE TABLE public.average_sensor_readings ( + sensor_id BIGINT PRIMARY KEY, + average_reading DOUBLE + );🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
135-135: Hard tabs
Column: 1(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1(MD010, no-hard-tabs)
150-159
: Formatting: Replace hard tabs inCREATE SOURCE
block.
Convert indentation from tabs to spaces to satisfy markdownlint:- CREATE SOURCE sensors_readings + CREATE SOURCE sensors_readings WITH ( connector = 'iceberg', - database.name='db.db', - warehouse.path='s3://warehouse/', + database.name='db.db', + warehouse.path='s3://warehouse/', s3.endpoint = 'http://host.containers.internal:9000', s3.access.key = 'minioadmin', s3.secret.key = 'minioadmin', - s3.region = 'minio' + s3.region = 'minio' );🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
153-153: Hard tabs
Column: 1(MD010, no-hard-tabs)
155-155: Hard tabs
Column: 1(MD010, no-hard-tabs)
158-158: Hard tabs
Column: 1(MD010, no-hard-tabs)
205-213
: Optional: Simplifyif
logic using tuple membership.
Chained equality checks can be more concise:-def cratedb_event_handler(event: pd.DataFrame): - for _, row in event.iterrows(): - if row["op"] == "Insert" or row["op"] == "UpdateInsert": - ... - if row["op"] == "Delete" or row["op"] == "UpdateDelete": - ... +def cratedb_event_handler(event: pd.DataFrame): + for _, row in event.iterrows(): + if row["op"] in ("Insert", "UpdateInsert"): + ... + elif row["op"] in ("Delete", "UpdateDelete"): + ...Also applies to: 215-219
231-231
: Optional: Mark thread as daemon for clean exit.
Adddaemon=True
so the background thread doesn’t block process shutdown:-threading.Thread(target=subscribe_average_sensor_readings_change).start() +threading.Thread(target=subscribe_average_sensor_readings_change, daemon=True).start()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)docs/integrate/etl/index.md
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- docs/integrate/etl/index.md
🧰 Additional context used
🧠 Learnings (1)
docs/integrate/etl/iceberg-risingwave.md (1)
Learnt from: amotl
PR: crate/cratedb-guide#207
File: docs/integrate/etl/iceberg-risingwave.md:205-207
Timestamp: 2025-06-05T14:29:15.512Z
Learning: The `records.Database("crate://", echo=True)` connection string for CrateDB works with defaults: localhost as host, "crate" as user, and blank password. This is valid and functional code in the records library.
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket called warehouse
, for this point a browser to `http://localho...
(AI_HYDRA_LEO_REPLACE_FOR_AT)
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/etl/iceberg-risingwave.md
135-135: Hard tabs
Column: 1
(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1
(MD010, no-hard-tabs)
153-153: Hard tabs
Column: 1
(MD010, no-hard-tabs)
155-155: Hard tabs
Column: 1
(MD010, no-hard-tabs)
158-158: Hard tabs
Column: 1
(MD010, no-hard-tabs)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build docs
```bash | ||
cat <<EOF >>cratedb_event_handler.py | ||
``` | ||
```python | ||
import threading | ||
import pandas as pd | ||
import records | ||
from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions | ||
rw = RisingWave( | ||
RisingWaveConnOptions.from_connection_info( | ||
host="localhost", port=4566, user="root", password="root", database="dev" | ||
) | ||
) | ||
def cratedb_event_handler(event: pd.DataFrame): | ||
cratedb = records.Database("crate://", echo=True) | ||
for _, row in event.iterrows(): | ||
if row["op"] == "Insert" or row["op"] == "UpdateInsert": | ||
cratedb.query( | ||
"INSERT INTO public.average_sensor_readings (sensor_id,average_reading) VALUES (:sensor_id,:average_reading);", | ||
**dict( | ||
sensor_id=row["sensor_id"], | ||
average_reading=row["average_reading"], | ||
), | ||
) | ||
if row["op"] == "Delete" or row["op"] == "UpdateDelete": | ||
cratedb.query( | ||
"DELETE FROM public.average_sensor_readings WHERE sensor_id=:sensor_id;", | ||
**dict(sensor_id=row["sensor_id"]), | ||
) | ||
def subscribe_average_sensor_readings_change(): | ||
rw.on_change( | ||
subscribe_from="average_sensor_readings", | ||
handler=cratedb_event_handler, | ||
output_format=OutputFormat.DATAFRAME, | ||
) | ||
threading.Thread(target=subscribe_average_sensor_readings_change).start() | ||
``` | ||
```bash | ||
EOF | ||
|
||
python cratedb_event_handler.py | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broken nested code fences in shell snippet—refactor into valid Markdown.
The sequence of fenced blocks around cat <<EOF
and the embedded Python snippet will not render properly and duplicates content. Consolidate into a single bash block (or split into a Python preview block and a bash creation block) with proper heredoc quoting and overwrite redirection (>
).
Example one-block solution:
-def k <<EOF >>cratedb_event_handler.py
-```python
-import threading
-...
-threading.Thread(target=subscribe_average_sensor_readings_change).start()
-```
-```bash
-EOF
-python cratedb_event_handler.py
-```
+```bash
+cat << 'EOF' > cratedb_event_handler.py
+#!/usr/bin/env python3
+import threading
+import pandas as pd
+import records
+from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions
+
+rw = RisingWave(
+ RisingWaveConnOptions.from_connection_info(
+ host="localhost", port=4566, user="root", password="root", database="dev"
+ )
+)
+
+def cratedb_event_handler(event: pd.DataFrame):
+ cratedb = records.Database("crate://", echo=True)
+ for _, row in event.iterrows():
+ if row["op"] in ("Insert", "UpdateInsert"):
+ cratedb.query(
+ "INSERT INTO public.average_sensor_readings (sensor_id, average_reading) VALUES (:sensor_id, :average_reading);",
+ sensor_id=row["sensor_id"],
+ average_reading=row["average_reading"]
+ )
+ elif row["op"] in ("Delete", "UpdateDelete"):
+ cratedb.query(
+ "DELETE FROM public.average_sensor_readings WHERE sensor_id = :sensor_id;",
+ sensor_id=row["sensor_id"]
+ )
+
+def subscribe_average_sensor_readings_change():
+ rw.on_change(
+ subscribe_from="average_sensor_readings",
+ handler=cratedb_event_handler,
+ output_format=OutputFormat.DATAFRAME,
+ )
+
+threading.Thread(target=subscribe_average_sensor_readings_change, daemon=True).start()
+EOF
+
+python3 cratedb_event_handler.py
+```
<details>
<summary>🤖 Prompt for AI Agents</summary>
In docs/integrate/etl/iceberg-risingwave.md around lines 186 to 237, the nested
fenced code blocks within the shell snippet using cat <<EOF and embedded Python
cause rendering issues and duplicate content. Refactor by consolidating into a
single bash fenced block that uses a properly quoted heredoc (e.g., cat << 'EOF'
filename) with overwrite redirection, include the full Python script inside
it, and close the heredoc before running the python command. Also, add a shebang
line to the Python script, use 'in' for op checks, and mark the thread as daemon
for clean exit.
</details>
<!-- This is an auto-generated comment by CodeRabbit -->
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
docs/integrate/etl/iceberg-risingwave.md (1)
186-237
: Fix nested code fences and heredoc usage
The current split of bash and Python fences insidecat <<EOF >>cratedb_event_handler.py
breaks Markdown rendering and causes duplicate appends. Consolidate into a single bash block with a quoted heredoc (cat << 'EOF' > cratedb_event_handler.py
), embed the full Python script, close the heredoc, then runpython3
.
🧹 Nitpick comments (5)
docs/integrate/etl/iceberg-risingwave.md (5)
18-21
: Pin Minio image to a specific version
Specifying an explicit tag (instead of relying on thelatest
default) ensures reproducibility and guards against breaking changes in future Minio releases.
30-31
: Remove redundant-it
flag for detached container
When using-d
to detach, the-it
(interactive TTY) flags are unnecessary and may confuse readers.
24-26
: Refine bucket creation instruction for clarity
Consider rephrasing to:
At this point, open a browser tohttp://localhost:9001
, log in withminioadmin
/minioadmin
, click “Create bucket”, enter “warehouse”, and confirm.🧰 Tools
🪛 LanguageTool
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket calledwarehouse
, for this point a browser to `http://localho...(AI_HYDRA_LEO_REPLACE_FOR_AT)
205-215
: Simplify op conditionals usingin
andelif
Replace multiple==
checks with:if row["op"] in ("Insert", "UpdateInsert"): … elif row["op"] in ("Delete", "UpdateDelete"): …This improves readability and avoids redundant evaluations.
228-231
: Mark thread as daemon for clean exit
Usethreading.Thread(target=…, daemon=True).start()
so the script can terminate without hanging when the main thread completes.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
docs/integrate/etl/iceberg-risingwave.md
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
docs/integrate/etl/iceberg-risingwave.md (1)
Learnt from: amotl
PR: crate/cratedb-guide#207
File: docs/integrate/etl/iceberg-risingwave.md:205-207
Timestamp: 2025-06-05T14:29:15.512Z
Learning: The `records.Database("crate://", echo=True)` connection string for CrateDB works with defaults: localhost as host, "crate" as user, and blank password. This is valid and functional code in the records library.
🪛 LanguageTool
docs/integrate/etl/iceberg-risingwave.md
[uncategorized] ~24-~24: The preposition ‘at’ seems more likely in this position.
Context: ...t's create a bucket called warehouse
, for this point a browser to `http://localho...
(AI_HYDRA_LEO_REPLACE_FOR_AT)
[uncategorized] ~34-~34: Possible missing comma found.
Context: .../risingwave:v2.4.0 single_node And finally an instance of CrateDB:
bash podman...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~46-~46: Possible missing comma found.
Context: ...in CrateDB in real-time. On the second console we will do the RisingWave and CrateDB s...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~49-~49: Possible missing comma found.
Context: ...e streaming of changes. And on the 3rd console we will review how data appears in Crat...
(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~293-~293: Possible missing comma found.
Context: ...in average_sensor_readings from the 3rd console we will see the average has already cha...
(AI_HYDRA_LEO_MISSING_COMMA)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build docs
About
An example with data coming from an Apache Iceberg table and aggregations materialized in real-time in CrateDB, using RisingWave. RisingWave is a stream processing platform that allows configuring data sources, views on that data, and destinations where results are materialized.
Preview
https://cratedb-guide--207.org.readthedocs.build/integrate/etl/iceberg-risingwave.html
References