diff --git a/01-docker-terraform/2_docker_sql/README.md b/01-docker-terraform/2_docker_sql/README.md
index 0e354638e..468180622 100644
--- a/01-docker-terraform/2_docker_sql/README.md
+++ b/01-docker-terraform/2_docker_sql/README.md
@@ -17,61 +17,21 @@ wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yell
### Running Postgres with Docker
-#### Windows
-
-Running Postgres on Windows (note the full path)
+Running Postgres on Windows, macOS and Linux
```bash
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
- -v c:/Users/alexe/git/data-engineering-zoomcamp/week_1_basics_n_setup/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data \
- -p 5432:5432 \
- postgres:13
-```
-
-If you have the following error:
-
-```
-docker run -it \
- -e POSTGRES_USER="root" \
- -e POSTGRES_PASSWORD="root" \
- -e POSTGRES_DB="ny_taxi" \
- -v e:/zoomcamp/data_engineer/week_1_fundamentals/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data \
+ -v ny_taxi_postgres_data:/var/lib/postgresql \
-p 5432:5432 \
- postgres:13
-
-docker: Error response from daemon: invalid mode: \Program Files\Git\var\lib\postgresql\data.
-See 'docker run --help'.
-```
-
-Change the mounting path. Replace it with the following:
-
-```
--v /e/zoomcamp/...:/var/lib/postgresql/data
-```
-
-#### Linux and MacOS
-
-
-```bash
-docker run -it \
- -e POSTGRES_USER="root" \
- -e POSTGRES_PASSWORD="root" \
- -e POSTGRES_DB="ny_taxi" \
- -v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data \
- -p 5432:5432 \
- postgres:13
+ postgres:18
```
If you see that `ny_taxi_postgres_data` is empty after running
the container, try these:
-* Deleting the folder and running Docker again (Docker will re-create the folder)
-* Adjust the permissions of the folder by running `sudo chmod a+rwx ny_taxi_postgres_data`
-
-
### CLI for Postgres
Installing `pgcli`
@@ -125,7 +85,7 @@ Running pgAdmin
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
- -p 8080:80 \
+ -p 8085:80 \
dpage/pgadmin4
```
@@ -144,11 +104,11 @@ docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
- -v c:/Users/alexe/git/data-engineering-zoomcamp/week_1_basics_n_setup/2_docker_sql/ny_taxi_postgres_data:/var/lib/postgresql/data \
+ -v ny_taxi_postgres_data:/var/lib/postgresql \
-p 5432:5432 \
--network=pg-network \
- --name pg-database \
- postgres:13
+ --name pgdatabase \
+ postgres:18
```
Run pgAdmin
@@ -157,14 +117,14 @@ Run pgAdmin
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
- -p 8080:80 \
+ -p 8085:80 \
--network=pg-network \
--name pgadmin-2 \
dpage/pgadmin4
```
-### Data ingestion
+### Data Ingestion
Running locally
@@ -187,22 +147,6 @@ Build the image
docker build -t taxi_ingest:v001 .
```
-On Linux you may have a problem building it:
-
-```
-error checking context: 'can't stat '/home/name/data_engineering/ny_taxi_postgres_data''.
-```
-
-You can solve it with `.dockerignore`:
-
-* Create a folder `data`
-* Move `ny_taxi_postgres_data` to `data` (you might need to use `sudo` for that)
-* Map `-v $(pwd)/data/ny_taxi_postgres_data:/var/lib/postgresql/data`
-* Create a file `.dockerignore` and add `data` there
-* Check [this video](https://www.youtube.com/watch?v=tOr4hTsHOzU&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb) (the middle) for more details
-
-
-
Run the script with Docker
```bash
@@ -213,47 +157,41 @@ docker run -it \
taxi_ingest:v001 \
--user=root \
--password=root \
- --host=pg-database \
+ --host=pgdatabase \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}
```
-### Docker-Compose
+### Docker Compose
Run it:
```bash
-docker-compose up
+docker compose up
```
Run in detached mode:
```bash
-docker-compose up -d
+docker compose up -d
```
Shutting it down:
```bash
-docker-compose down
-```
-
-Note: to make pgAdmin configuration persistent, create a folder `data_pgadmin`. Change its permission via
-
-```bash
-sudo chown 5050:5050 data_pgadmin
+docker compose down
```
-and mount it to the `/var/lib/pgadmin` folder:
+Add a docker volume to the `pgadmin` container:
```yaml
services:
pgadmin:
image: dpage/pgadmin4
volumes:
- - ./data_pgadmin:/var/lib/pgadmin
+ - data_pgadmin:/var/lib/pgadmin
...
```
diff --git a/01-docker-terraform/2_docker_sql/docker-compose.yaml b/01-docker-terraform/2_docker_sql/docker-compose.yaml
index 7b6e2c852..a37356c72 100644
--- a/01-docker-terraform/2_docker_sql/docker-compose.yaml
+++ b/01-docker-terraform/2_docker_sql/docker-compose.yaml
@@ -1,12 +1,16 @@
+volumes:
+ ny_taxi_postgres_data:
+ driver: local
+
services:
pgdatabase:
- image: postgres:13
+ image: postgres:18
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- - "./ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
+ - ny_taxi_postgres_data:/var/lib/postgresql
ports:
- "5432:5432"
pgadmin:
@@ -15,5 +19,5 @@ services:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- - "8080:80"
+ - "8085:80"
\ No newline at end of file
diff --git a/02-workflow-orchestration/README.md b/02-workflow-orchestration/README.md
index 26d5d8a08..01827b4cc 100644
--- a/02-workflow-orchestration/README.md
+++ b/02-workflow-orchestration/README.md
@@ -9,97 +9,186 @@ Kestra is an open-source, event-driven orchestration platform that simplifies bu
---
-# Course Structure
+## Course Structure
-## 1. Conceptual Material: Introduction to Orchestration and Kestra
+- [2.1 - Introduction to Workflow Orchestration](#21-introduction-to-workflow-orchestration)
+- [2.2 - Getting Started With Kestra](#22-getting-started-with-kestra)
+- [2.3 - Hands-On Coding Project: Build ETL Data Pipelines with Kestra](#23-hands-on-coding-project-build-data-pipelines-with-kestra)
+- [2.4 - ELT Pipelines in Kestra: Google Cloud Platform](#24-elt-pipelines-in-kestra-google-cloud-platform)
+- [2.5 - Using AI for Data Engineering in Kestra](#25-using-ai-for-data-engineering-in-kestra)
+- [2.6 - Bonus](#26-bonus-deploy-to-the-cloud-optional)
+
+
+## 2.1 Introduction to Workflow Orchestration
In this section, you’ll learn the foundations of workflow orchestration, its importance, and how Kestra fits into the orchestration landscape.
-### Videos
-- **2.2.1 - Introduction to Workflow Orchestration**
- [](https://youtu.be/Np6QmmcgLCs)
+### 2.1.1 - What is Workflow Orchestration?
+
+Think of a music orchestra. There's a variety of different instruments. Some more than others, all with different roles when it comes to playing music. To make sure they all come together at the right time, they follow a conductor who helps the orchestra to play together.
+
+Now replace the instruments with tools and the conductor with an orchestrator. We often have multiple tools and platforms that we need to work together. Sometimes on a routine schedule, other times based on events that happen. That's where the orchestrator comes in to help all of these tools work together.
+
+A workflow orchestrator might do the following tasks:
+- Run workflows which contain a number of predefined steps
+- Monitor and log errors, as well as taking a number of extra steps when they occur
+- Automatically run workflows based on schedules and events
+
+In data engineering, you often need to move data from one place, to another, sometimes with some modifications made to the data in the middle. This is where a workflow orchestrator can help out by managing these steps, while giving us visibility into it at the same time.
+
+In this module, we're going to build our own data pipeline using ETL (Extract, Transform Load) with Kestra at the core of the operation, but first we need to understand a bit more about how Kestra works before we can get building!
+
+#### Videos
+- **2.1.1 - What is Workflow Orchestration?**
+ Coming soon
+
+
+### 2.1.2 - What is Kestra?
+
+Kestra is an open-source, infinitely-scalable orchestration platform that enables all engineers to manage business-critical workflows.
+
+Kestra is a great choice for workflow orchestration:
+- Build with Flow code (YAML), No-code or with the AI Copilot - flexibility in how you build your workflows
+- 1000+ Plugins - integrate with all the tools you use
+- Support for any programming language - pick the right tool for the job
+- Schedule or Event Based Triggers - have your workflows respond to data
-- **2.2.2 - Learn the Concepts of Kestra**
- [](https://youtu.be/o79n-EVpics)
+#### Videos
+
+- **2.1.2 - What is Kestra?**
+ Coming soon
### Resources
- [Quickstart Guide](https://go.kestra.io/de-zoomcamp/quickstart)
-- [Install Kestra with Docker Compose](https://go.kestra.io/de-zoomcamp/docker-compose)
-- [Tutorial](https://go.kestra.io/de-zoomcamp/tutorial)
- [What is an Orchestrator?](https://go.kestra.io/de-zoomcamp/what-is-an-orchestrator)
---
-## 2. Hands-On Coding Project: Build Data Pipelines with Kestra
+## 2.2 Getting Started with Kestra
-This week, we're gonna build ETL pipelines for Yellow and Green Taxi data from NYC’s Taxi and Limousine Commission (TLC). You will:
-1. Extract data from [CSV files](https://github.com/DataTalksClub/nyc-tlc-data/releases).
-2. Load it into Postgres or Google Cloud (GCS + BigQuery).
-3. Explore scheduling and backfilling workflows.
+In this section, you'll learn how to install Kestra, as well as the key concepts required to build your first workflow. Once our first workflow is built, we can extend this further by executing a Python script inside of a workflow.
->[!NOTE]
-If you’re using the PostgreSQL and PgAdmin docker setup from Module 1 for this week’s Kestra Workflow Orchestration exercise, ensure your PostgreSQL image version is 15 or later (preferably the latest). The MERGE statement, introduced in PostgreSQL 15, won’t work on earlier versions and will likely cause syntax errors in your kestra flows.
+You will:
+1. Install Kestra using Docker Compose
+2. Learn the concepts of Kestra to build your first workflow
+3. Execute a Python script inside of a Kestra Flow
-### File Structure
+### 2.2.1 - Installing Kestra
-The project is organized as follows:
-```
-.
-├── flows/
-│ ├── 01_getting_started_data_pipeline.yaml
-│ ├── 02_postgres_taxi.yaml
-│ ├── 02_postgres_taxi_scheduled.yaml
-│ ├── 03_postgres_dbt.yaml
-│ ├── 04_gcp_kv.yaml
-│ ├── 05_gcp_setup.yaml
-│ ├── 06_gcp_taxi.yaml
-│ ├── 06_gcp_taxi_scheduled.yaml
-│ └── 07_gcp_dbt.yaml
-```
+To install Kestra, we are going to use Docker Compose. We already have a Postgres database set up, along with pgAdmin from Module 1. We can continue to use these with Kestra but we'll need to make a few modifications to our Docker Compose file.
+
+Use [this example Docker Compose file](docker-compose.yml) to correctly add the 2 new services and set up the volumes correctly.
-### Setup Kestra
+Add information about setting a username and password.
We'll set up Kestra using Docker Compose containing one container for the Kestra server and another for the Postgres database:
```bash
-cd 02-workflow-orchestration/docker/combined
+cd 02-workflow-orchestration
docker compose up -d
```
+**Note:** Check that `pgAdmin` isn't running on the same ports as Kestra. If so, check out the [FAQ](#troubleshooting-tips) at the bottom of the README.
+
Once the container starts, you can access the Kestra UI at [http://localhost:8080](http://localhost:8080).
+To shut down Kestra, go to the same directory and run the following command:
+
+```bash
+docker compose down
+```
+#### Add Flows to Kestra
+
+Flows can be added to Kestra by copying and pasting the YAML directly into the editor, or by adding via Kestra's API. See below for adding programmatically.
+
+
+Add Flows to Kestra programmatically
+
If you prefer to add flows programmatically using Kestra's API, run the following commands:
```bash
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/01_getting_started_data_pipeline.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi_scheduled.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/03_postgres_dbt.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/04_gcp_kv.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/05_gcp_setup.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi_scheduled.yaml
-curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/07_gcp_dbt.yaml
+# Import all flows: assuming username admin@kestra.io and password Admin1234 (adjust to match your username and password)
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/01_hello_world.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_python.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/03_getting_started_data_pipeline.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/04_postgres_taxi.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/05_postgres_taxi_scheduled.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_kv.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/07_gcp_setup.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/08_gcp_taxi.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/09_gcp_taxi_scheduled.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/10_chat_without_rag.yaml
+curl -X POST -u 'admin@kestra.io:Admin1234' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/11_chat_with_rag.yaml
```
+
----
+#### Videos
-## 3. ETL Pipelines in Kestra: Detailed Walkthrough
+- **2.2.1 - Installing Kestra**
+ [](https://youtu.be/wgPxC4UjoLM)
-### Getting Started Pipeline
+#### Resources
+- [Install Kestra with Docker Compose](https://go.kestra.io/de-zoomcamp/docker-compose)
-This introductory flow is added just to demonstrate a simple data pipeline which extracts data via HTTP REST API, transforms that data in Python and then queries it using DuckDB. For this stage, a new separate Postgres database is created for the exercises.
-**Note:** Check that `pgAdmin` isn't running on the same ports as Kestra. If so, check out the [FAQ](#troubleshooting-tips) at the bottom of the README.
+### 2.2.2 - Kestra Concepts
+
+To start building workflows in Kestra, we need to understand a number of concepts.
+- [Flow](https://go.kestra.io/de-zoomcamp/flow) - a container for tasks and their orchestration logic.
+- [Tasks](https://go.kestra.io/de-zoomcamp/tasks) - the steps within a flow.
+- [Inputs](https://go.kestra.io/de-zoomcamp/inputs) - dynamic values passed to the flow at runtime.
+- [Outputs](https://go.kestra.io/de-zoomcamp/outputs) - pass data between tasks and flows.
+- [Triggers](https://go.kestra.io/de-zoomcamp/triggers) - mechanism that automatically starts the execution of a flow.
+- [Execution](https://go.kestra.io/de-zoomcamp/execution) - a single run of a flow with a specific state.
+- [Variables](https://go.kestra.io/de-zoomcamp/variables) - key–value pairs that let you reuse values across tasks.
+- [Plugin Defaults](https://go.kestra.io/de-zoomcamp/plugin-defaults) - default values applied to every task of a given type within one or more flows.
+- [Concurrency](https://go.kestra.io/de-zoomcamp/concurrency) - control how many executions of a flow can run at the same time.
+
+While there are more concepts used for building powerful workflows, these are the ones we're going to use to build our data pipelines.
+
+The flow [`01_hello_world.yaml`](flows/01_hello_world.yaml) showcases all of these concepts inside of one workflow:
+- The flow has 5 tasks: 2 log tasks and a sleep task
+- The flow takes an input called `name`.
+- There is a variable that takes the `name` input to generate a full welcome message.
+- An output is generated from the return task and is logged in a later log task.
+- There is a trigger to execute this flow every day at 10am.
+- Plugin Defaults are used to make both log tasks send their messages as `ERROR` level.
+- We have a concurrency limit of 2 executions. Any further ones made while 2 are running will fail.
+
+#### Videos
+- **2.2.2 - Kestra Concepts**
+ [](https://youtu.be/MNOKVx8780E)
+
+#### Resources
+- [Tutorial](https://go.kestra.io/de-zoomcamp/tutorial)
+- [Workflow Components Documentation](https://go.kestra.io/de-zoomcamp/workflow-components)
+
+### 2.2.3 - Orchestrate Python Code
+
+Now that we've built our first workflow, we can take it a step further by adding Python code into our flow. In Kestra, we can run Python code from a dedicated file or write it directly inside of our workflow.
+
+While Kestra has a huge variety of plugins available for building your workflows, you also have the option to write your own code and have Kestra execute that based on schedules or events. This means you can pick the right tools for your pipelines, rather than the ones you're limited to.
+
+In our example Python workflow, [`02_python.yaml`](flows/02_python.yaml), our code fetches the number of Docker image pulls from DockerHub and returns it as an output to Kestra. This is useful as we can access this output with other tasks, even though it was generated inside of our Python script.
+
+#### Videos
+- **2.2.3 - Orchestrate Python Code**
+ [](https://youtu.be/VAHm0R_XjqI)
-### Videos
+#### Resources
+- [How-to Guide: Python](https://go.kestra.io/de-zoomcamp/python)
-- **2.2.3 - Create an ETL Pipeline with Postgres in Kestra**
- [](https://youtu.be/OkfLX28Ecjg?si=vKbIyWo1TtjpNnvt)
-- **2.2.4 - Manage Scheduling and Backfills using Postgres in Kestra**
- [](https://youtu.be/_-li_z97zog?si=G6jZbkfJb3GAyqrd)
-- **2.2.5 - Transform Data with dbt and Postgres in Kestra**
- [](https://youtu.be/ZLp2N6p2JjE?si=tWhcvq5w4lO8v1_p)
+
+## 2.3 Hands-On Coding Project: Build Data Pipelines with Kestra
+
+Next, we're gonna build ETL pipelines for Yellow and Green Taxi data from NYC’s Taxi and Limousine Commission (TLC). You will:
+1. Extract data from [CSV files](https://github.com/DataTalksClub/nyc-tlc-data/releases).
+2. Load it into Postgres or Google Cloud (GCS + BigQuery).
+3. Explore scheduling and backfilling workflows.
+
+### 2.3.1 Getting Started Pipeline
+
+This introductory flow is added just to demonstrate a simple data pipeline which extracts data via HTTP REST API, transforms that data in Python and then queries it using DuckDB. For this stage, a new separate Postgres database is created for the exercises.
```mermaid
@@ -108,15 +197,17 @@ graph LR
Transform --> Query[Query Data with DuckDB]
```
-Add the flow [`01_getting_started_data_pipeline.yaml`](flows/01_getting_started_data_pipeline.yaml) from the UI if you haven't already and execute it to see the results. Inspect the Gantt and Logs tabs to understand the flow execution.
+Add the flow [`03_getting_started_data_pipeline.yaml`](flows/03_getting_started_data_pipeline.yaml) from the UI if you haven't already and execute it to see the results. Inspect the Gantt and Logs tabs to understand the flow execution.
-### Local DB: Load Taxi Data to Postgres
+#### Videos
-Before we start loading data to GCP, we'll first play with the Yellow and Green Taxi data using a local Postgres database running in a Docker container. We'll create a new Postgres database for these examples using this [Docker Compose file](docker/postgres/docker-compose.yml). Download it into a new directory, navigate to it and run the following command to start it:
+- **2.3.1 - Getting Started Pipeline**
+ [](https://youtu.be/-KmwrCqRhic)
-```bash
-docker compose up -d
-```
+
+### 2.3.2 Local DB: Load Taxi Data to Postgres
+
+Before we start loading data to GCP, we'll first play with the Yellow and Green Taxi data using a local Postgres database running in a Docker container. We will use the same database from Module 1 which should be in the same Docker Compose file as Kestra.
The flow will extract CSV data partitioned by year and month, create tables, load data to the monthly table, and finally merge the data to the final destination table.
@@ -137,60 +228,67 @@ graph LR
classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
```
-The flow code: [`02_postgres_taxi.yaml`](flows/02_postgres_taxi.yaml).
+The flow code: [`04_postgres_taxi.yaml`](flows/04_postgres_taxi.yaml).
> [!NOTE]
> The NYC Taxi and Limousine Commission (TLC) Trip Record Data provided on the [nyc.gov](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) website is currently available only in a Parquet format, but this is NOT the dataset we're going to use in this course. For the purpose of this course, we'll use the **CSV files** available [here on GitHub](https://github.com/DataTalksClub/nyc-tlc-data/releases). This is because the Parquet format can be challenging to understand by newcomers, and we want to make the course as accessible as possible — the CSV format can be easily introspected using tools like Excel or Google Sheets, or even a simple text editor.
-### Local DB: Learn Scheduling and Backfills
-
-We can now schedule the same pipeline shown above to run daily at 9 AM UTC. We'll also demonstrate how to backfill the data pipeline to run on historical data.
+#### Videos
-Note: given the large dataset, we'll backfill only data for the green taxi dataset for the year 2019.
+- **2.3.2 - Local DB: Load Taxi Data to Postgres**
+ Coming soon
-The flow code: [`02_postgres_taxi_scheduled.yaml`](flows/02_postgres_taxi_scheduled.yaml).
+#### Resources
+- [Docker Compose with Kestra, Postgres and pgAdmin](docker-compose.yml)
-### Local DB: Orchestrate dbt Models (Optional)
+### 2.3.3 Local DB: Learn Scheduling and Backfills
-Now that we have raw data ingested into a local Postgres database, we can use dbt to transform the data into meaningful insights. The flow will sync the dbt models from Git to Kestra and run the `dbt build` command to build the models.
+We can now schedule the same pipeline shown above to run daily at 9 AM UTC. We'll also demonstrate how to backfill the data pipeline to run on historical data.
-```mermaid
-graph LR
- Start[Select dbt command] --> Sync[Sync Namespace Files]
- Sync --> DbtBuild[Run dbt CLI]
-```
+Note: given the large dataset, we'll backfill only data for the green taxi dataset for the year 2019.
-This gives you a quick showcase of dbt inside of Kestra so the homework tasks do not depend on it. The course will go into more detail of dbt in [Week 4](../04-analytics-engineering).
+The flow code: [`05_postgres_taxi_scheduled.yaml`](flows/05_postgres_taxi_scheduled.yaml).
-The flow code: [`03_postgres_dbt.yaml`](flows/03_postgres_dbt.yaml).
+#### Videos
-### Resources
-- [pgAdmin Download](https://www.pgadmin.org/download/)
-- [Postgres DB Docker Compose](docker/postgres/docker-compose.yml)
+- **2.3.3 - Scheduling and Backfills**
+ Coming soon
---
-## 4. ETL Pipelines in Kestra: Google Cloud Platform
+## 2.4 ELT Pipelines in Kestra: Google Cloud Platform
Now that you've learned how to build ETL pipelines locally using Postgres, we are ready to move to the cloud. In this section, we'll load the same Yellow and Green Taxi data to Google Cloud Platform (GCP) using:
1. Google Cloud Storage (GCS) as a data lake
2. BigQuery as a data warehouse.
-### Videos
+### 2.4.1 - ETL vs ELT
+
+In 2.3, we made a ETL pipeline inside of Kestra:
+- **Extract:** Firstly, we extract the dataset from GitHub
+- **Transform:** Next, we transform it with Python
+- **Load:** Finally, we load it into our Postgres database
-- **2.2.6 - Create an ETL Pipeline with GCS and BigQuery in Kestra**
- [](https://youtu.be/nKqjjLJ7YXs)
-- **2.2.7 - Manage Scheduling and Backfills using BigQuery in Kestra**
- [](https://youtu.be/DoaZ5JWEkH0)
-- **2.2.8 - Transform Data with dbt and BigQuery in Kestra**
- [](https://youtu.be/eF_EdV4A1Wk)
+While this is very standard across the industry, sometimes it makes sense to change the order when working with the cloud. If you're working with a large dataset, like the Yellow Taxi data, there can be benefits to extracting and loading straight into a data warehouse, and then performing transformations directly in the data warehouse. When working with BigQuery, we will use ELT:
+- **Extract:** Firstly, we extract the dataset from GitHub
+- **Load:** Next, we load this dataset (in this case, a csv file) into a data lake (Google Cloud Storage)
+- **Transform:** Finally, we can create a table inside of our data warehouse (BigQuery) which uses the data from our data lake to perform our transformations.
-### Setup Google Cloud Platform (GCP)
+The reason for loading into the data warehouse before transforming means we can utilize the cloud's performance benefits for transforming large datasets. What might take a lot longer for a local machine, can take a fraction of the time in the cloud.
+
+Over the next few videos, we'll look at setting up BigQuery and transforming the Yellow Taxi dataset.
+
+#### Videos
+
+- **2.4.1 - ETL vs ELT**
+ Coming soon
+
+### 2.4.2 Setup Google Cloud Platform (GCP)
Before we start loading data to GCP, we need to set up the Google Cloud Platform.
-First, adjust the following flow [`04_gcp_kv.yaml`](flows/04_gcp_kv.yaml) to include your service account, GCP project ID, BigQuery dataset and GCS bucket name (_along with their location_) as KV Store values:
+First, adjust the following flow [`06_gcp_kv.yaml`](flows/06_gcp_kv.yaml) to include your service account, GCP project ID, BigQuery dataset and GCS bucket name (_along with their location_) as KV Store values:
- GCP_CREDS
- GCP_PROJECT_ID
- GCP_LOCATION
@@ -201,12 +299,18 @@ First, adjust the following flow [`04_gcp_kv.yaml`](flows/04_gcp_kv.yaml) to inc
> [!WARNING]
> The `GCP_CREDS` service account contains sensitive information. Ensure you keep it secure and do not commit it to Git. Keep it as secure as your passwords.
-### Create GCP Resources
+#### Create GCP Resources
+
+If you haven't already created the GCS bucket and BigQuery dataset in the first week of the course, you can use this flow to create them: [`07_gcp_setup.yaml`](flows/07_gcp_setup.yaml).
+
+#### Videos
-If you haven't already created the GCS bucket and BigQuery dataset in the first week of the course, you can use this flow to create them: [`05_gcp_setup.yaml`](flows/05_gcp_setup.yaml).
+- **2.4.2 - Create an ETL Pipeline with GCS and BigQuery in Kestra**
+ Coming soon
+### 2.4.3 GCP Workflow: Load Taxi Data to BigQuery
-### GCP Workflow: Load Taxi Data to BigQuery
+Now that Google Cloud is set up with a storage bucket, we can start the ELT process.
```mermaid
graph LR
@@ -227,42 +331,273 @@ graph LR
classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
```
-The flow code: [`06_gcp_taxi.yaml`](flows/06_gcp_taxi.yaml).
+The flow code: [`08_gcp_taxi.yaml`](flows/08_gcp_taxi.yaml).
-### GCP Workflow: Schedule and Backfill Full Dataset
+#### Videos
+
+- **2.4.3 - Create an ETL Pipeline with GCS and BigQuery in Kestra**
+ Coming soon
+
+### 2.4.4 GCP Workflow: Schedule and Backfill Full Dataset
We can now schedule the same pipeline shown above to run daily at 9 AM UTC for the green dataset and at 10 AM UTC for the yellow dataset. You can backfill historical data directly from the Kestra UI.
Since we now process data in a cloud environment with infinitely scalable storage and compute, we can backfill the entire dataset for both the yellow and green taxi data without the risk of running out of resources on our local machine.
-The flow code: [`06_gcp_taxi_scheduled.yaml`](flows/06_gcp_taxi_scheduled.yaml).
+The flow code: [`09_gcp_taxi_scheduled.yaml`](flows/09_gcp_taxi_scheduled.yaml).
+
+#### Videos
+
+- **2.4.4 - GCP Workflow: Schedule and Backfills**
+ Coming soon
+
+---
+
+## 2.5 Using AI for Data Engineering in Kestra
+
+This section builds on what you learned earlier in Module 2 to show you how AI can speed up workflow development.
+
+By the end of this section, you will:
+- Understand why context engineering matters when collaborating with LLMs
+- Use AI Copilot to build Kestra flows faster
+- Use Retrieval Augmented Generation (RAG) in data pipelines
+
+### Prerequisites
+
+- Completion of earlier sections in Module 2 (Workflow Orchestration with Kestra)
+- Kestra running locally
+- Google Cloud account with access to Gemini API (there's a generous free tier!)
+
+---
+
+### 2.5.1 Introduction: Why AI for Workflows?
+
+As data engineers, we spend significant time writing boilerplate code, searching documentation, and structuring data pipelines. AI tools can help us:
+
+- **Generate workflows faster**: Describe what you want to accomplish in natural language instead of writing YAML from scratch
+- **Avoid errors**: Get syntax-correct, up-to-date workflow code that follows best practices
+
+However, AI is only as good as the context we provide. This section teaches you how to engineer that context for reliable, production-ready data workflows.
-### GCP Workflow: Orchestrate dbt Models (Optional)
+#### Videos
-Now that we have raw data ingested into BigQuery, we can use dbt to transform that data. The flow will sync the dbt models from Git to Kestra and run the `dbt build` command to build the models:
+- **2.5.1 - Using AI for Data Engineering**
+ Coming soon
+
+---
+
+### 2.5.2 Context Engineering with ChatGPT
+
+Let's start by seeing what happens when AI lacks proper context.
+
+#### Experiment: ChatGPT Without Context
+
+1. **Open ChatGPT in a private browser window** (to avoid any existing chat context): https://chatgpt.com
+
+2. **Enter this prompt:**
+ ```
+ Create a Kestra flow that loads NYC taxi data from a CSV file to BigQuery. The flow should extract data, upload to GCS, and load to BigQuery.
+ ```
+
+3. **Observe the results:**
+ - ChatGPT will generate a Kestra flow, but it likely contains:
+ - **Outdated plugin syntax** e.g., old task types that have been renamed
+ - **Incorrect property names** e.g., properties that don't exist in current versions
+ - **Hallucinated features** e.g., tasks, triggers or properties that never existed
+
+#### Why Does This Happen?
+
+Large Language Models (LLMs) like GPT models from OpenAI are trained on data up to a specific point in time (knowledge cutoff). They don't automatically know about:
+- Software updates and new releases
+- Renamed plugins or changed APIs
+
+This is the fundamental challenge of using AI: **the model can only work with information it has access to.**
+
+#### Key Learning: Context is Everything
+
+Without proper context:
+- ❌ Generic AI assistants hallucinate outdated or incorrect code
+- ❌ You can't trust the output for production use
+
+With proper context:
+- ✅ AI generates accurate, current, production-ready code
+- ✅ You can iterate faster by letting AI generate boilerplate workflow code
+
+In the next section, we'll see how Kestra's AI Copilot solves this problem.
+
+#### Videos
+
+- **2.5.2 - Context Engineering with ChatGPT**
+ Coming soon
+
+---
+
+### 2.5.3 AI Copilot in Kestra
+
+Kestra's AI Copilot is specifically designed to generate and modify Kestra flows with full context about the latest plugins, workflow syntax, and best practices.
+
+#### Setup AI Copilot
+
+Before using AI Copilot, you need to configure Gemini API access in your Kestra instance.
+
+**Step 1: Get Your Gemini API Key**
+
+1. Visit Google AI Studio: https://aistudio.google.com/app/apikey
+2. Sign in with your Google account
+3. Click "Create API Key"
+4. Copy the generated key (keep it secure!)
+
+> [!WARNING]
+> Never commit API keys to Git. Always use environment variables or Kestra's KV Store.
+
+**Step 2: Configure Kestra AI Copilot**
+
+Add the following to your Kestra configuration. You can do this by modifying your `docker-compose.yml` file from 2.2:
+
+```yaml
+services:
+ kestra:
+ environment:
+ KESTRA_CONFIGURATION: |
+ kestra:
+ ai:
+ type: gemini
+ gemini:
+ model-name: gemini-2.5-flash
+ api-key: ${GEMINI_API_KEY}
+```
+
+Then restart Kestra:
+```bash
+cd 02-workflow-orchestration/docker
+export GEMINI_API_KEY="your-api-key-here"
+docker compose up -d
+```
+
+#### Exercise: ChatGPT vs AI Copilot Comparison
+
+**Objective:** Learn why context engineering matters.
+
+1. **Open Kestra UI** at http://localhost:8080
+2. **Create a new flow** and open the Code editor panel
+3. **Click the AI Copilot button** (sparkle icon ✨) in the top-right corner
+4. **Enter the same exact prompt** we used with ChatGPT:
+ ```
+ Create a Kestra flow that loads NYC taxi data from a CSV file to BigQuery. The flow should extract data, upload to GCS, and load to BigQuery.
+ ```
+5. **Compare the outputs:**
+ - ✅ Copilot generates executable, working YAML
+ - ✅ Copilot uses correct plugin types and properties
+ - ✅ Copilot follows current Kestra best practices
+
+**Key Learning:** Context matters! AI Copilot has access to current Kestra documentation, generating Kestra flows better than a generic ChatGPT assistant.
+
+#### Videos
+
+- **2.5.3 - AI Copilot in Kestra**
+ Coming soon
+
+
+### 2.5.4 Bonus: Retrieval Augmented Generation (RAG)
+
+To further learn how to provide context to your prompts, this bonus section demonstrates how to use RAG.
+
+#### What is RAG?
+
+**RAG (Retrieval Augmented Generation)** is a technique that:
+1. **Retrieves** relevant information from your data sources
+2. **Augments** the AI prompt with this context
+3. **Generates** a response grounded in real data
+
+This solves the hallucination problem by ensuring the AI has access to current, accurate information at query time.
+
+#### How RAG Works in Kestra
```mermaid
graph LR
- Start[Select dbt command] --> Sync[Sync Namespace Files]
- Sync --> Build[Run dbt Build Command]
+ A[Ask AI] --> B[Fetch Docs]
+ B --> C[Create Embeddings]
+ C --> D[Find Similar Content]
+ D --> E[Add Context to Prompt]
+ E --> F[LLM Answer]
```
-This gives you a quick showcase of dbt inside of Kestra so the homework tasks do not depend on it. The course will go into more detail of dbt in [Week 4](../04-analytics-engineering).
+**The Process:**
+1. **Ingest documents**: Load documentation, release notes, or other data sources
+2. **Create embeddings**: Convert text into vector representations using an LLM
+3. **Store embeddings**: Save vectors in Kestra's KV Store (or a vector database)
+4. **Query with context**: When you ask a question, retrieve relevant embeddings and include them in the prompt
+5. **Generate response**: The LLM has real context and provides accurate answers
-The flow code: [`07_gcp_dbt.yaml`](flows/07_gcp_dbt.yaml).
+#### Exercise: Retrieval With vs Without Context
----
+**Objective:** Understand how RAG eliminates hallucinations by grounding LLM responses in real data.
+
+**Part A: Without RAG**
+1. Navigate to the [`10_chat_without_rag.yaml`](flows/10_chat_without_rag.yaml) flow in your Kestra UI
+2. Click **Execute**
+3. Wait for the execution to complete
+4. Open the **Logs** tab
+5. Read the output - notice how the response about "Kestra 1.1 features" is:
+ - Vague or generic
+ - Potentially incorrect
+ - Missing specific details
+ - Based only on the model's training data (which may be outdated)
+
+**Part B: With RAG**
+1. Navigate to the [`11_chat_with_rag.yaml`](flows/11_chat_with_rag.yaml) flow
+2. Click **Execute**
+3. Watch the execution:
+ - First task: **Ingests** Kestra 1.1 release documentation, creates **embeddings** and stores them
+ - Second task: **Prompts LLM** with context retrieved from stored embeddings
+4. Open the **Logs** tab
+5. Compare this output with the previous one - notice how it's:
+ - ✅ Specific and detailed
+ - ✅ Accurate with real features from the release
+ - ✅ Grounded in actual documentation
+
+**Key Learning:** RAG (Retrieval Augmented Generation) grounds AI responses in current documentation, eliminating hallucinations and providing accurate, context-aware answers.
-## 5. Bonus: Deploy to the Cloud (Optional)
+#### RAG Best Practices
-Now that we've got our ETL pipeline working both locally and in the cloud, we can deploy Kestra to the cloud so it can continue to orchestrate our ETL pipelines monthly with our configured schedules, We'll cover how you can install Kestra on Google Cloud in Production, and automatically sync and deploy your workflows from a Git repository.
+1. **Keep documents updated**: Regularly re-ingest to ensure current information
+2. **Chunk appropriately**: Break large documents into meaningful chunks
+3. **Test retrieval quality**: Verify that the right documents are retrieved
+
+#### Additional AI Resources
+
+Kestra Documentation:
+- [AI Tools Overview](https://kestra.io/docs/ai-tools)
+- [AI Copilot](https://kestra.io/docs/ai-tools/ai-copilot)
+- [RAG Workflows](https://kestra.io/docs/ai-tools/ai-rag-workflows)
+- [AI Workflows](https://kestra.io/docs/ai-tools/ai-workflows)
+- [Kestra Blueprints](https://kestra.io/blueprints) - Pre-built workflow examples
+
+Kestra Plugin Documentation:
+- [AI Plugin](https://kestra.io/plugins/plugin-ai)
+- [RAG Tasks](https://kestra.io/plugins/plugin-ai/rag)
+
+External Documentation:
+- [Google Gemini](https://ai.google.dev/docs)
+- [Google AI Studio](https://aistudio.google.com/)
+
+#### Videos
+
+- **2.5.4 (Bonus) - Retrieval Augmented Generation**
+ Coming soon
+
+## 2.6 Bonus: Deploy to the Cloud (Optional)
+
+Now that we've got all our pipelines working and we know how to quickly create new flows with Kestra's AI Copilot, we can deploy Kestra to the cloud so it can continue to orchestrate our scheduled pipelines.
+
+In this bonus section, we'll cover how you can deploy Kestra on Google Cloud and automatically sync your workflows from a Git repository.
Note: When committing your workflows to Kestra, make sure your workflow doesn't contain any sensitive information. You can use [Secrets](https://go.kestra.io/de-zoomcamp/secret) and the [KV Store](https://go.kestra.io/de-zoomcamp/kv-store) to keep sensitive data out of your workflow logic.
-### Videos
+#### Videos
-- **2.2.9 - Deploy Workflows to the Cloud with Git**
- [](https://youtu.be/l-wC71tI3co)
+- **2.6.1 - Deploy Workflows to the Cloud with Git**
+ Coming soon
Resources
@@ -271,7 +606,7 @@ Resources
- [Using Git in Kestra](https://go.kestra.io/de-zoomcamp/git)
- [Deploy Flows with GitHub Actions](https://go.kestra.io/de-zoomcamp/deploy-github-actions)
-## 6. Additional Resources 📚
+## 2.7 Additional Resources 📚
- Check [Kestra Docs](https://go.kestra.io/de-zoomcamp/docs)
- Explore our [Blueprints](https://go.kestra.io/de-zoomcamp/blueprints) library
@@ -284,131 +619,12 @@ Resources
### Troubleshooting tips
If you face any issues with Kestra flows in Module 2, make sure to use the following Docker images/ports:
-- `kestra/kestra:latest` is correct = latest stable release, while `kestra/kestra:develop` is incorrect as this is a bleeding-edge development version that might contain bugs
-- `postgres:latest` — make sure to use Postgres image, which uses **PostgreSQL 15** or higher
-- If you run `pgAdmin` or something else on port 8080, you can adjust Kestra docker-compose to use a different port, e.g. change port mapping to 18080 instead of 8080, and then access Kestra UI in your browser from http://localhost:18080/ instead of from http://localhost:8080/
-
-If you're using Linux, you might encounter `Connection Refused` errors when connecting to the Postgres DB from within Kestra. This is because `host.docker.internal` works differently on Linux. Using the modified Docker Compose file below, you can run both Kestra and its dedicated Postgres DB, as well as the Postgres DB for the exercises all together. You can access it within Kestra by referring to the container name `postgres_zoomcamp` instead of `host.docker.internal` in `pluginDefaults`. This applies to pgAdmin as well. If you'd prefer to keep it in separate Docker Compose files, you'll need to setup a Docker network so that they can communicate with each other.
-
-
-Docker Compose Example
-
-This Docker Compose has the Zoomcamp DB container and pgAdmin container added to it, so it's all in one file.
-
-Changes include:
-- New `volume` for the Zoomcamp DB container
-- Zoomcamp DB container is added and renamed to prevent clashes with the Kestra DB container
-- Depends on condition is added to make sure Kestra is running before it starts
-- pgAdmin is added and running on Port 8085 so it doesn't clash wit Kestra which uses 8080 and 8081
-
-```yaml
-volumes:
- postgres-data:
- driver: local
- kestra-data:
- driver: local
- zoomcamp-data:
- driver: local
-
-services:
- postgres:
- image: postgres
- volumes:
- - postgres-data:/var/lib/postgresql/data
- environment:
- POSTGRES_DB: kestra
- POSTGRES_USER: kestra
- POSTGRES_PASSWORD: k3str4
- healthcheck:
- test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
- interval: 30s
- timeout: 10s
- retries: 10
-
- kestra:
- image: kestra/kestra:latest
- pull_policy: always
- # Note that this setup with a root user is intended for development purpose.
- # Our base image runs without root, but the Docker Compose implementation needs root to access the Docker socket
- # To run Kestra in a rootless mode in production, see: https://kestra.io/docs/installation/podman-compose
- user: "root"
- command: server standalone
- volumes:
- - kestra-data:/app/storage
- - /var/run/docker.sock:/var/run/docker.sock
- - /tmp/kestra-wd:/tmp/kestra-wd
- environment:
- KESTRA_CONFIGURATION: |
- datasources:
- postgres:
- url: jdbc:postgresql://postgres:5432/kestra
- driverClassName: org.postgresql.Driver
- username: kestra
- password: k3str4
- kestra:
- server:
- basicAuth:
- enabled: false
- username: "admin@kestra.io" # it must be a valid email address
- password: kestra
- repository:
- type: postgres
- storage:
- type: local
- local:
- basePath: "/app/storage"
- queue:
- type: postgres
- tasks:
- tmpDir:
- path: /tmp/kestra-wd/tmp
- url: http://localhost:8080/
- ports:
- - "8080:8080"
- - "8081:8081"
- depends_on:
- postgres:
- condition: service_started
-
- postgres_zoomcamp:
- image: postgres
- environment:
- POSTGRES_USER: kestra
- POSTGRES_PASSWORD: k3str4
- POSTGRES_DB: postgres-zoomcamp
- ports:
- - "5432:5432"
- volumes:
- - zoomcamp-data:/var/lib/postgresql/data
- depends_on:
- kestra:
- condition: service_started
-
- pgadmin:
- image: dpage/pgadmin4
- environment:
- - PGADMIN_DEFAULT_EMAIL=admin@admin.com
- - PGADMIN_DEFAULT_PASSWORD=root
- ports:
- - "8085:80"
- depends_on:
- postgres_zoomcamp:
- condition: service_started
-```
-
-
+- `image: kestra/kestra:v1.1` - pin your Kestra Docker image to this version so we can ensure reproducibility; do NOT use `kestra/kestra:develop` as this is a bleeding-edge development version that might contain bugs
+- `postgres:18` — make sure to pin your Postgres image to version 18
+- If you run `pgAdmin` or something else on port 8080, you can adjust Kestra `docker-compose` to use a different port, e.g. change port mapping to 18080 instead of 8080, and then access Kestra UI in your browser from http://localhost:18080/ instead of from http://localhost:8080/
If you are still facing any issues, stop and remove your existing Kestra + Postgres containers and start them again using `docker-compose up -d`. If this doesn't help, post your question on the DataTalksClub Slack or on Kestra's Slack http://kestra.io/slack.
-- **DE Zoomcamp FAQ - PostgresDB Setup and Installing pgAdmin**
- [](https://youtu.be/ywAPYNYFaB4?si=5X9AD0nFAT2WLWgS)
-- **DE Zoomcamp FAQ - Port and Images**
- [](https://youtu.be/l2M2mW76RIU?si=oqyZ7KUaI27vi90V)
-- **DE Zoomcamp FAQ - Docker Setup**
- [](https://youtu.be/73g6qJN0HcM)
-
-
-
If you encounter similar errors to:
```
BigQueryError{reason=invalid, location=null,
@@ -425,8 +641,7 @@ It means that the CSV file you're trying to load into BigQuery has a mismatch in
## Homework
-See the [2025 cohort folder](../cohorts/2025/02-workflow-orchestration/homework.md)
-
+See the [2026 cohort folder](../cohorts/2026/02-workflow-orchestration/homework.md)
---
@@ -434,11 +649,6 @@ See the [2025 cohort folder](../cohorts/2025/02-workflow-orchestration/homework.
Did you take notes? You can share them by creating a PR to this file!
-* [Notes from Manuel Guerra)](https://github.com/ManuelGuerra1987/data-engineering-zoomcamp-notes/blob/main/2_Workflow-Orchestration-(Kestra)/README.md)
-* [Notes from Horeb Seidou](https://spotted-hardhat-eea.notion.site/Week-2-Workflow-Orchestration-17129780dc4a80148debf61e6453fffe)
-* [Notes from Livia](https://docs.google.com/document/d/1Y_QMonvEtFPbXIzmdpCSVsKNC1BWAHFBA1mpK9qaZko/edit?usp=sharing)
-* [2025 Gitbook Notes from Tinker0425](https://data-engineering-zoomcamp-2025-t.gitbook.io/tinker0425/module-2/introduction-to-module-2)
-* [Notes from Mercy Markus: Linux/Fedora Tweaks and Tips](https://mercymarkus.com/posts/2025/series/dtc-dez-jan-2025/dtc-dez-2025-module-2/)
* Add your notes above this line
---
@@ -448,4 +658,4 @@ Did you take notes? You can share them by creating a PR to this file!
* 2022: [notes](../cohorts/2022/week_2_data_ingestion#community-notes) and [videos](../cohorts/2022/week_2_data_ingestion)
* 2023: [notes](../cohorts/2023/week_2_workflow_orchestration#community-notes) and [videos](../cohorts/2023/week_2_workflow_orchestration)
* 2024: [notes](../cohorts/2024/02-workflow-orchestration#community-notes) and [videos](../cohorts/2024/02-workflow-orchestration)
-
+* 2025: [notes](../cohorts/2025/02-workflow-orchestration/README.md#community-notes) and [videos](../cohorts/2025/02-workflow-orchestration)
diff --git a/02-workflow-orchestration/docker/combined/docker-compose.yml b/02-workflow-orchestration/docker-compose.yml
similarity index 73%
rename from 02-workflow-orchestration/docker/combined/docker-compose.yml
rename to 02-workflow-orchestration/docker-compose.yml
index 92b931ced..5036c36dc 100644
--- a/02-workflow-orchestration/docker/combined/docker-compose.yml
+++ b/02-workflow-orchestration/docker-compose.yml
@@ -1,16 +1,35 @@
volumes:
- postgres-data:
+ ny_taxi_postgres_data:
driver: local
- kestra-data:
+ kestra_postgres_data:
driver: local
- zoomcamp-data:
+ kestra_data:
driver: local
services:
- postgres:
- image: postgres
+ pgdatabase:
+ image: postgres:18
+ environment:
+ POSTGRES_USER: root
+ POSTGRES_PASSWORD: root
+ POSTGRES_DB: ny_taxi
+ ports:
+ - "5432:5432"
+ volumes:
+ - ny_taxi_postgres_data:/var/lib/postgresql
+
+ pgadmin:
+ image: dpage/pgadmin4
+ environment:
+ - PGADMIN_DEFAULT_EMAIL=admin@admin.com
+ - PGADMIN_DEFAULT_PASSWORD=root
+ ports:
+ - "8085:80"
+
+ kestra_postgres:
+ image: postgres:18
volumes:
- - postgres-data:/var/lib/postgresql/data
+ - kestra_postgres_data:/var/lib/postgresql
environment:
POSTGRES_DB: kestra
POSTGRES_USER: kestra
@@ -22,7 +41,7 @@ services:
retries: 10
kestra:
- image: kestra/kestra:v0.20.7
+ image: kestra/kestra:v1.1
pull_policy: always
# Note that this setup with a root user is intended for development purpose.
# Our base image runs without root, but the Docker Compose implementation needs root to access the Docker socket
@@ -30,23 +49,22 @@ services:
user: "root"
command: server standalone
volumes:
- - kestra-data:/app/storage
+ - kestra_data:/app/storage
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/kestra-wd:/tmp/kestra-wd
environment:
KESTRA_CONFIGURATION: |
datasources:
postgres:
- url: jdbc:postgresql://postgres:5432/kestra
+ url: jdbc:postgresql://kestra_postgres:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
kestra:
server:
basicAuth:
- enabled: false
username: "admin@kestra.io" # it must be a valid email address
- password: kestra
+ password: Admin1234
repository:
type: postgres
storage:
@@ -63,30 +81,6 @@ services:
- "8080:8080"
- "8081:8081"
depends_on:
- postgres:
- condition: service_started
-
- postgres_zoomcamp:
- image: postgres
- environment:
- POSTGRES_USER: kestra
- POSTGRES_PASSWORD: k3str4
- POSTGRES_DB: postgres-zoomcamp
- ports:
- - "5432:5432"
- volumes:
- - zoomcamp-data:/var/lib/postgresql/data
- depends_on:
- kestra:
- condition: service_started
-
- pgadmin:
- image: dpage/pgadmin4
- environment:
- - PGADMIN_DEFAULT_EMAIL=admin@admin.com
- - PGADMIN_DEFAULT_PASSWORD=root
- ports:
- - "8085:80"
- depends_on:
- postgres_zoomcamp:
+ kestra_postgres:
condition: service_started
+
\ No newline at end of file
diff --git a/02-workflow-orchestration/docker/kestra/docker-compose.yml b/02-workflow-orchestration/docker/kestra/docker-compose.yml
deleted file mode 100644
index db7da3760..000000000
--- a/02-workflow-orchestration/docker/kestra/docker-compose.yml
+++ /dev/null
@@ -1,62 +0,0 @@
-volumes:
- postgres-data:
- driver: local
- kestra-data:
- driver: local
-
-services:
- postgres:
- image: postgres
- volumes:
- - postgres-data:/var/lib/postgresql/data
- environment:
- POSTGRES_DB: kestra
- POSTGRES_USER: kestra
- POSTGRES_PASSWORD: k3str4
- healthcheck:
- test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
- interval: 30s
- timeout: 10s
- retries: 10
-
- kestra:
- image: kestra/kestra:v0.20.7
- pull_policy: always
- user: "root"
- command: server standalone
- volumes:
- - kestra-data:/app/storage
- - /var/run/docker.sock:/var/run/docker.sock
- - /tmp/kestra-wd:/tmp/kestra-wd
- environment:
- KESTRA_CONFIGURATION: |
- datasources:
- postgres:
- url: jdbc:postgresql://postgres:5432/kestra
- driverClassName: org.postgresql.Driver
- username: kestra
- password: k3str4
- kestra:
- server:
- basicAuth:
- enabled: false
- username: "admin@kestra.io" # it must be a valid email address
- password: kestra
- repository:
- type: postgres
- storage:
- type: local
- local:
- basePath: "/app/storage"
- queue:
- type: postgres
- tasks:
- tmpDir:
- path: /tmp/kestra-wd/tmp
- url: http://localhost:8080/
- ports:
- - "8080:8080"
- - "8081:8081"
- depends_on:
- postgres:
- condition: service_started
diff --git a/02-workflow-orchestration/docker/postgres/docker-compose.yml b/02-workflow-orchestration/docker/postgres/docker-compose.yml
deleted file mode 100644
index c2c032cb4..000000000
--- a/02-workflow-orchestration/docker/postgres/docker-compose.yml
+++ /dev/null
@@ -1,15 +0,0 @@
-version: "3.8"
-services:
- postgres:
- image: postgres
- container_name: postgres-db
- environment:
- POSTGRES_USER: kestra
- POSTGRES_PASSWORD: k3str4
- POSTGRES_DB: postgres-zoomcamp
- ports:
- - "5432:5432"
- volumes:
- - postgres-data:/var/lib/postgresql/data
-volumes:
- postgres-data:
\ No newline at end of file
diff --git a/02-workflow-orchestration/flows/01_hello_world.yaml b/02-workflow-orchestration/flows/01_hello_world.yaml
new file mode 100644
index 000000000..428fc1f9a
--- /dev/null
+++ b/02-workflow-orchestration/flows/01_hello_world.yaml
@@ -0,0 +1,48 @@
+id: getting_started
+namespace: zoomcamp
+
+inputs:
+ - id: name
+ type: STRING
+ defaults: Will
+
+concurrency:
+ behavior: FAIL
+ limit: 2
+
+variables:
+ welcome_message: "Hello, {{ inputs.name }}!"
+
+tasks:
+ - id: hello_message
+ type: io.kestra.plugin.core.log.Log
+ message: "{{ render(vars.welcome_message) }}"
+
+ - id: generate_output
+ type: io.kestra.plugin.core.debug.Return
+ format: I was generated during this workflow.
+
+ - id: sleep
+ type: io.kestra.plugin.core.flow.Sleep
+ duration: PT15S
+
+ - id: log_output
+ type: io.kestra.plugin.core.log.Log
+ message: "This is an output: {{ outputs.generate_output.value }}"
+
+ - id: goodbye_message
+ type: io.kestra.plugin.core.log.Log
+ message: "Goodbye, {{ inputs.name }}!"
+
+pluginDefaults:
+ - type: io.kestra.plugin.core.log.Log
+ values:
+ level: ERROR
+
+triggers:
+ - id: schedule
+ type: io.kestra.plugin.core.trigger.Schedule
+ cron: "0 10 * * *"
+ inputs:
+ name: Sarah
+ disabled: true
diff --git a/02-workflow-orchestration/flows/02_python.yaml b/02-workflow-orchestration/flows/02_python.yaml
new file mode 100644
index 000000000..9522ab0b5
--- /dev/null
+++ b/02-workflow-orchestration/flows/02_python.yaml
@@ -0,0 +1,29 @@
+id: python_scripts
+namespace: zoomcamp
+
+description: This flow will install the pip package in a Docker container, and use kestra's Python library to generate outputs (number of downloads of the Kestra Docker image) and metrics (duration of the script).
+
+tasks:
+ - id: collect_stats
+ type: io.kestra.plugin.scripts.python.Script
+ taskRunner:
+ type: io.kestra.plugin.scripts.runner.docker.Docker
+ containerImage: python:slim
+ dependencies:
+ - requests
+ - kestra
+ script: |
+ from kestra import Kestra
+ import requests
+ def get_docker_image_downloads(image_name: str = "kestra/kestra"):
+ """Queries the Docker Hub API to get the number of downloads for a specific Docker image."""
+ url = f"https://hub.docker.com/v2/repositories/{image_name}/"
+ response = requests.get(url)
+ data = response.json()
+ downloads = data.get('pull_count', 'Not available')
+ return downloads
+ downloads = get_docker_image_downloads()
+ outputs = {
+ 'downloads': downloads
+ }
+ Kestra.outputs(outputs)
\ No newline at end of file
diff --git a/02-workflow-orchestration/flows/01_getting_started_data_pipeline.yaml b/02-workflow-orchestration/flows/03_getting_started_data_pipeline.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/01_getting_started_data_pipeline.yaml
rename to 02-workflow-orchestration/flows/03_getting_started_data_pipeline.yaml
diff --git a/02-workflow-orchestration/flows/04_postgres_taxi.yaml b/02-workflow-orchestration/flows/04_postgres_taxi.yaml
new file mode 100644
index 000000000..89a6309c1
--- /dev/null
+++ b/02-workflow-orchestration/flows/04_postgres_taxi.yaml
@@ -0,0 +1,270 @@
+id: 02_postgres_taxi
+namespace: zoomcamp
+description: |
+ The CSV Data used in the course: https://github.com/DataTalksClub/nyc-tlc-data/releases
+
+inputs:
+ - id: taxi
+ type: SELECT
+ displayName: Select taxi type
+ values: [yellow, green]
+ defaults: yellow
+
+ - id: year
+ type: SELECT
+ displayName: Select year
+ values: ["2019", "2020"]
+ defaults: "2019"
+
+ - id: month
+ type: SELECT
+ displayName: Select month
+ values: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
+ defaults: "01"
+
+variables:
+ file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
+ staging_table: "public.{{inputs.taxi}}_tripdata_staging"
+ table: "public.{{inputs.taxi}}_tripdata"
+ data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
+
+tasks:
+ - id: set_label
+ type: io.kestra.plugin.core.execution.Labels
+ labels:
+ file: "{{render(vars.file)}}"
+ taxi: "{{inputs.taxi}}"
+
+ - id: extract
+ type: io.kestra.plugin.scripts.shell.Commands
+ outputFiles:
+ - "*.csv"
+ taskRunner:
+ type: io.kestra.plugin.core.runner.Process
+ commands:
+ - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
+
+ - id: if_yellow_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'yellow'}}"
+ then:
+ - id: yellow_create_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ tpep_pickup_datetime timestamp,
+ tpep_dropoff_datetime timestamp,
+ passenger_count integer,
+ trip_distance double precision,
+ RatecodeID text,
+ store_and_fwd_flag text,
+ PULocationID text,
+ DOLocationID text,
+ payment_type integer,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ congestion_surcharge double precision
+ );
+
+ - id: yellow_create_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ tpep_pickup_datetime timestamp,
+ tpep_dropoff_datetime timestamp,
+ passenger_count integer,
+ trip_distance double precision,
+ RatecodeID text,
+ store_and_fwd_flag text,
+ PULocationID text,
+ DOLocationID text,
+ payment_type integer,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ congestion_surcharge double precision
+ );
+
+ - id: yellow_truncate_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ TRUNCATE TABLE {{render(vars.staging_table)}};
+
+ - id: yellow_copy_in_to_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.CopyIn
+ format: CSV
+ from: "{{render(vars.data)}}"
+ table: "{{render(vars.staging_table)}}"
+ header: true
+ columns: [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge]
+
+ - id: yellow_add_unique_id_and_filename
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ UPDATE {{render(vars.staging_table)}}
+ SET
+ unique_row_id = md5(
+ COALESCE(CAST(VendorID AS text), '') ||
+ COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
+ COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
+ COALESCE(PULocationID, '') ||
+ COALESCE(DOLocationID, '') ||
+ COALESCE(CAST(fare_amount AS text), '') ||
+ COALESCE(CAST(trip_distance AS text), '')
+ ),
+ filename = '{{render(vars.file)}}';
+
+ - id: yellow_merge_data
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ MERGE INTO {{render(vars.table)}} AS T
+ USING {{render(vars.staging_table)}} AS S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
+ passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
+ DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
+ improvement_surcharge, total_amount, congestion_surcharge
+ )
+ VALUES (
+ S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
+ S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
+ S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
+ S.improvement_surcharge, S.total_amount, S.congestion_surcharge
+ );
+
+ - id: if_green_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'green'}}"
+ then:
+ - id: green_create_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ lpep_pickup_datetime timestamp,
+ lpep_dropoff_datetime timestamp,
+ store_and_fwd_flag text,
+ RatecodeID text,
+ PULocationID text,
+ DOLocationID text,
+ passenger_count integer,
+ trip_distance double precision,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ ehail_fee double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ payment_type integer,
+ trip_type integer,
+ congestion_surcharge double precision
+ );
+
+ - id: green_create_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ lpep_pickup_datetime timestamp,
+ lpep_dropoff_datetime timestamp,
+ store_and_fwd_flag text,
+ RatecodeID text,
+ PULocationID text,
+ DOLocationID text,
+ passenger_count integer,
+ trip_distance double precision,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ ehail_fee double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ payment_type integer,
+ trip_type integer,
+ congestion_surcharge double precision
+ );
+
+ - id: green_truncate_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ TRUNCATE TABLE {{render(vars.staging_table)}};
+
+ - id: green_copy_in_to_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.CopyIn
+ format: CSV
+ from: "{{render(vars.data)}}"
+ table: "{{render(vars.staging_table)}}"
+ header: true
+ columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
+
+ - id: green_add_unique_id_and_filename
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ UPDATE {{render(vars.staging_table)}}
+ SET
+ unique_row_id = md5(
+ COALESCE(CAST(VendorID AS text), '') ||
+ COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
+ COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
+ COALESCE(PULocationID, '') ||
+ COALESCE(DOLocationID, '') ||
+ COALESCE(CAST(fare_amount AS text), '') ||
+ COALESCE(CAST(trip_distance AS text), '')
+ ),
+ filename = '{{render(vars.file)}}';
+
+ - id: green_merge_data
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ MERGE INTO {{render(vars.table)}} AS T
+ USING {{render(vars.staging_table)}} AS S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
+ store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
+ trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
+ improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
+ )
+ VALUES (
+ S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
+ S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
+ S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
+ S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
+ );
+
+ - id: purge_files
+ type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
+ description: This will remove output files. If you'd like to explore Kestra outputs, disable it.
+
+pluginDefaults:
+ - type: io.kestra.plugin.jdbc.postgresql
+ values:
+ url: jdbc:postgresql://pgdatabase:5432/ny_taxi
+ username: root
+ password: root
diff --git a/02-workflow-orchestration/flows/05_postgres_taxi_scheduled.yaml b/02-workflow-orchestration/flows/05_postgres_taxi_scheduled.yaml
new file mode 100644
index 000000000..23ec6d63f
--- /dev/null
+++ b/02-workflow-orchestration/flows/05_postgres_taxi_scheduled.yaml
@@ -0,0 +1,275 @@
+id: 02_postgres_taxi_scheduled
+namespace: zoomcamp
+description: |
+ Best to add a label `backfill:true` from the UI to track executions created via a backfill.
+ CSV data used here comes from: https://github.com/DataTalksClub/nyc-tlc-data/releases
+
+concurrency:
+ limit: 1
+
+inputs:
+ - id: taxi
+ type: SELECT
+ displayName: Select taxi type
+ values: [yellow, green]
+ defaults: yellow
+
+variables:
+ file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"
+ staging_table: "public.{{inputs.taxi}}_tripdata_staging"
+ table: "public.{{inputs.taxi}}_tripdata"
+ data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ (trigger.date | date('yyyy-MM')) ~ '.csv']}}"
+
+tasks:
+ - id: set_label
+ type: io.kestra.plugin.core.execution.Labels
+ labels:
+ file: "{{render(vars.file)}}"
+ taxi: "{{inputs.taxi}}"
+
+ - id: extract
+ type: io.kestra.plugin.scripts.shell.Commands
+ outputFiles:
+ - "*.csv"
+ taskRunner:
+ type: io.kestra.plugin.core.runner.Process
+ commands:
+ - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
+
+ - id: if_yellow_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'yellow'}}"
+ then:
+ - id: yellow_create_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ tpep_pickup_datetime timestamp,
+ tpep_dropoff_datetime timestamp,
+ passenger_count integer,
+ trip_distance double precision,
+ RatecodeID text,
+ store_and_fwd_flag text,
+ PULocationID text,
+ DOLocationID text,
+ payment_type integer,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ congestion_surcharge double precision
+ );
+
+ - id: yellow_create_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ tpep_pickup_datetime timestamp,
+ tpep_dropoff_datetime timestamp,
+ passenger_count integer,
+ trip_distance double precision,
+ RatecodeID text,
+ store_and_fwd_flag text,
+ PULocationID text,
+ DOLocationID text,
+ payment_type integer,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ congestion_surcharge double precision
+ );
+
+ - id: yellow_truncate_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ TRUNCATE TABLE {{render(vars.staging_table)}};
+
+ - id: yellow_copy_in_to_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.CopyIn
+ format: CSV
+ from: "{{render(vars.data)}}"
+ table: "{{render(vars.staging_table)}}"
+ header: true
+ columns: [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge]
+
+ - id: yellow_add_unique_id_and_filename
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ UPDATE {{render(vars.staging_table)}}
+ SET
+ unique_row_id = md5(
+ COALESCE(CAST(VendorID AS text), '') ||
+ COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
+ COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
+ COALESCE(PULocationID, '') ||
+ COALESCE(DOLocationID, '') ||
+ COALESCE(CAST(fare_amount AS text), '') ||
+ COALESCE(CAST(trip_distance AS text), '')
+ ),
+ filename = '{{render(vars.file)}}';
+
+ - id: yellow_merge_data
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ MERGE INTO {{render(vars.table)}} AS T
+ USING {{render(vars.staging_table)}} AS S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
+ passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
+ DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
+ improvement_surcharge, total_amount, congestion_surcharge
+ )
+ VALUES (
+ S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
+ S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
+ S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
+ S.improvement_surcharge, S.total_amount, S.congestion_surcharge
+ );
+
+ - id: if_green_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'green'}}"
+ then:
+ - id: green_create_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ lpep_pickup_datetime timestamp,
+ lpep_dropoff_datetime timestamp,
+ store_and_fwd_flag text,
+ RatecodeID text,
+ PULocationID text,
+ DOLocationID text,
+ passenger_count integer,
+ trip_distance double precision,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ ehail_fee double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ payment_type integer,
+ trip_type integer,
+ congestion_surcharge double precision
+ );
+
+ - id: green_create_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
+ unique_row_id text,
+ filename text,
+ VendorID text,
+ lpep_pickup_datetime timestamp,
+ lpep_dropoff_datetime timestamp,
+ store_and_fwd_flag text,
+ RatecodeID text,
+ PULocationID text,
+ DOLocationID text,
+ passenger_count integer,
+ trip_distance double precision,
+ fare_amount double precision,
+ extra double precision,
+ mta_tax double precision,
+ tip_amount double precision,
+ tolls_amount double precision,
+ ehail_fee double precision,
+ improvement_surcharge double precision,
+ total_amount double precision,
+ payment_type integer,
+ trip_type integer,
+ congestion_surcharge double precision
+ );
+
+ - id: green_truncate_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ TRUNCATE TABLE {{render(vars.staging_table)}};
+
+ - id: green_copy_in_to_staging_table
+ type: io.kestra.plugin.jdbc.postgresql.CopyIn
+ format: CSV
+ from: "{{render(vars.data)}}"
+ table: "{{render(vars.staging_table)}}"
+ header: true
+ columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
+
+ - id: green_add_unique_id_and_filename
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ UPDATE {{render(vars.staging_table)}}
+ SET
+ unique_row_id = md5(
+ COALESCE(CAST(VendorID AS text), '') ||
+ COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
+ COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
+ COALESCE(PULocationID, '') ||
+ COALESCE(DOLocationID, '') ||
+ COALESCE(CAST(fare_amount AS text), '') ||
+ COALESCE(CAST(trip_distance AS text), '')
+ ),
+ filename = '{{render(vars.file)}}';
+
+ - id: green_merge_data
+ type: io.kestra.plugin.jdbc.postgresql.Queries
+ sql: |
+ MERGE INTO {{render(vars.table)}} AS T
+ USING {{render(vars.staging_table)}} AS S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (
+ unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
+ store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
+ trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
+ improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
+ )
+ VALUES (
+ S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
+ S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
+ S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
+ S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
+ );
+
+ - id: purge_files
+ type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
+ description: To avoid cluttering your storage, we will remove the downloaded files
+
+pluginDefaults:
+ - type: io.kestra.plugin.jdbc.postgresql
+ values:
+ url: jdbc:postgresql://pgdatabase:5432/ny_taxi
+ username: root
+ password: root
+
+triggers:
+ - id: green_schedule
+ type: io.kestra.plugin.core.trigger.Schedule
+ cron: "0 9 1 * *"
+ inputs:
+ taxi: green
+
+ - id: yellow_schedule
+ type: io.kestra.plugin.core.trigger.Schedule
+ cron: "0 10 1 * *"
+ inputs:
+ taxi: yellow
diff --git a/02-workflow-orchestration/flows/06_gcp_kv.yaml b/02-workflow-orchestration/flows/06_gcp_kv.yaml
new file mode 100644
index 000000000..a6489d6b0
--- /dev/null
+++ b/02-workflow-orchestration/flows/06_gcp_kv.yaml
@@ -0,0 +1,28 @@
+id: 03_gcp_kv
+namespace: zoomcamp
+
+tasks:
+ - id: gcp_project_id
+ type: io.kestra.plugin.core.kv.Set
+ key: GCP_PROJECT_ID
+ kvType: STRING
+ value: kestra-sandbox # TODO replace with your project id
+
+ - id: gcp_location
+ type: io.kestra.plugin.core.kv.Set
+ key: GCP_LOCATION
+ kvType: STRING
+ value: europe-west2
+
+ - id: gcp_bucket_name
+ type: io.kestra.plugin.core.kv.Set
+ key: GCP_BUCKET_NAME
+ kvType: STRING
+ value: your-name-kestra # TODO make sure it's globally unique!
+
+ - id: gcp_dataset
+ type: io.kestra.plugin.core.kv.Set
+ key: GCP_DATASET
+ kvType: STRING
+ value: zoomcamp
+
diff --git a/02-workflow-orchestration/flows/07_gcp_setup.yaml b/02-workflow-orchestration/flows/07_gcp_setup.yaml
new file mode 100644
index 000000000..69fea5c38
--- /dev/null
+++ b/02-workflow-orchestration/flows/07_gcp_setup.yaml
@@ -0,0 +1,23 @@
+id: 04_gcp_setup
+namespace: zoomcamp
+
+tasks:
+ - id: create_gcs_bucket
+ type: io.kestra.plugin.gcp.gcs.CreateBucket
+ ifExists: SKIP
+ storageClass: REGIONAL
+ name: "{{kv('GCP_BUCKET_NAME')}}" # make sure it's globally unique!
+
+ - id: create_bq_dataset
+ type: io.kestra.plugin.gcp.bigquery.CreateDataset
+ name: "{{kv('GCP_DATASET')}}"
+ ifExists: SKIP
+
+pluginDefaults:
+ - type: io.kestra.plugin.gcp
+ values:
+ serviceAccount: "{{kv('GCP_CREDS')}}"
+ projectId: "{{kv('GCP_PROJECT_ID')}}"
+ location: "{{kv('GCP_LOCATION')}}"
+ bucket: "{{kv('GCP_BUCKET_NAME')}}"
+
diff --git a/02-workflow-orchestration/flows/08_gcp_taxi.yaml b/02-workflow-orchestration/flows/08_gcp_taxi.yaml
new file mode 100644
index 000000000..9af983179
--- /dev/null
+++ b/02-workflow-orchestration/flows/08_gcp_taxi.yaml
@@ -0,0 +1,249 @@
+id: 05_gcp_taxi
+namespace: zoomcamp
+description: |
+ The CSV Data used in the course: https://github.com/DataTalksClub/nyc-tlc-data/releases
+
+inputs:
+ - id: taxi
+ type: SELECT
+ displayName: Select taxi type
+ values: [yellow, green]
+ defaults: green
+
+ - id: year
+ type: SELECT
+ displayName: Select year
+ values: ["2019", "2020"]
+ defaults: "2019"
+ allowCustomValue: true # allows you to type 2021 from the UI for the homework 🤗
+
+ - id: month
+ type: SELECT
+ displayName: Select month
+ values: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
+ defaults: "01"
+
+variables:
+ file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
+ gcs_file: "gs://{{kv('GCP_BUCKET_NAME')}}/{{vars.file}}"
+ table: "{{kv('GCP_DATASET')}}.{{inputs.taxi}}_tripdata_{{inputs.year}}_{{inputs.month}}"
+ data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
+
+tasks:
+ - id: set_label
+ type: io.kestra.plugin.core.execution.Labels
+ labels:
+ file: "{{render(vars.file)}}"
+ taxi: "{{inputs.taxi}}"
+
+ - id: extract
+ type: io.kestra.plugin.scripts.shell.Commands
+ outputFiles:
+ - "*.csv"
+ taskRunner:
+ type: io.kestra.plugin.core.runner.Process
+ commands:
+ - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
+
+ - id: upload_to_gcs
+ type: io.kestra.plugin.gcp.gcs.Upload
+ from: "{{render(vars.data)}}"
+ to: "{{render(vars.gcs_file)}}"
+
+ - id: if_yellow_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'yellow'}}"
+ then:
+ - id: bq_yellow_tripdata
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata`
+ (
+ unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
+ filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ PARTITION BY DATE(tpep_pickup_datetime);
+
+ - id: bq_yellow_table_ext
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
+ (
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ OPTIONS (
+ format = 'CSV',
+ uris = ['{{render(vars.gcs_file)}}'],
+ skip_leading_rows = 1,
+ ignore_unknown_values = TRUE
+ );
+
+ - id: bq_yellow_table_tmp
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
+ AS
+ SELECT
+ MD5(CONCAT(
+ COALESCE(CAST(VendorID AS STRING), ""),
+ COALESCE(CAST(tpep_pickup_datetime AS STRING), ""),
+ COALESCE(CAST(tpep_dropoff_datetime AS STRING), ""),
+ COALESCE(CAST(PULocationID AS STRING), ""),
+ COALESCE(CAST(DOLocationID AS STRING), "")
+ )) AS unique_row_id,
+ "{{render(vars.file)}}" AS filename,
+ *
+ FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
+
+ - id: bq_yellow_merge
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata` T
+ USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge)
+ VALUES (S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime, S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID, S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.improvement_surcharge, S.total_amount, S.congestion_surcharge);
+
+ - id: if_green_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'green'}}"
+ then:
+ - id: bq_green_tripdata
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata`
+ (
+ unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
+ filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ ehail_fee NUMERIC,
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ PARTITION BY DATE(lpep_pickup_datetime);
+
+ - id: bq_green_table_ext
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
+ (
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ ehail_fee NUMERIC,
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ OPTIONS (
+ format = 'CSV',
+ uris = ['{{render(vars.gcs_file)}}'],
+ skip_leading_rows = 1,
+ ignore_unknown_values = TRUE
+ );
+
+ - id: bq_green_table_tmp
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
+ AS
+ SELECT
+ MD5(CONCAT(
+ COALESCE(CAST(VendorID AS STRING), ""),
+ COALESCE(CAST(lpep_pickup_datetime AS STRING), ""),
+ COALESCE(CAST(lpep_dropoff_datetime AS STRING), ""),
+ COALESCE(CAST(PULocationID AS STRING), ""),
+ COALESCE(CAST(DOLocationID AS STRING), "")
+ )) AS unique_row_id,
+ "{{render(vars.file)}}" AS filename,
+ *
+ FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
+
+ - id: bq_green_merge
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata` T
+ USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge)
+ VALUES (S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime, S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count, S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee, S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge);
+
+ - id: purge_files
+ type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
+ description: If you'd like to explore Kestra outputs, disable it.
+ disabled: false
+
+pluginDefaults:
+ - type: io.kestra.plugin.gcp
+ values:
+ serviceAccount: "{{kv('GCP_CREDS')}}"
+ projectId: "{{kv('GCP_PROJECT_ID')}}"
+ location: "{{kv('GCP_LOCATION')}}"
+ bucket: "{{kv('GCP_BUCKET_NAME')}}"
+
diff --git a/02-workflow-orchestration/flows/09_gcp_taxi_scheduled.yaml b/02-workflow-orchestration/flows/09_gcp_taxi_scheduled.yaml
new file mode 100644
index 000000000..73a56c114
--- /dev/null
+++ b/02-workflow-orchestration/flows/09_gcp_taxi_scheduled.yaml
@@ -0,0 +1,250 @@
+
+id: 05_gcp_taxi_scheduled
+namespace: zoomcamp
+description: |
+ Best to add a label `backfill:true` from the UI to track executions created via a backfill.
+ CSV data used here comes from: https://github.com/DataTalksClub/nyc-tlc-data/releases
+
+inputs:
+ - id: taxi
+ type: SELECT
+ displayName: Select taxi type
+ values: [yellow, green]
+ defaults: green
+
+variables:
+ file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"
+ gcs_file: "gs://{{kv('GCP_BUCKET_NAME')}}/{{vars.file}}"
+ table: "{{kv('GCP_DATASET')}}.{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy_MM')}}"
+ data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ (trigger.date | date('yyyy-MM')) ~ '.csv']}}"
+
+tasks:
+ - id: set_label
+ type: io.kestra.plugin.core.execution.Labels
+ labels:
+ file: "{{render(vars.file)}}"
+ taxi: "{{inputs.taxi}}"
+
+ - id: extract
+ type: io.kestra.plugin.scripts.shell.Commands
+ outputFiles:
+ - "*.csv"
+ taskRunner:
+ type: io.kestra.plugin.core.runner.Process
+ commands:
+ - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
+
+ - id: upload_to_gcs
+ type: io.kestra.plugin.gcp.gcs.Upload
+ from: "{{render(vars.data)}}"
+ to: "{{render(vars.gcs_file)}}"
+
+ - id: if_yellow_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'yellow'}}"
+ then:
+ - id: bq_yellow_tripdata
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata`
+ (
+ unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
+ filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ PARTITION BY DATE(tpep_pickup_datetime);
+
+ - id: bq_yellow_table_ext
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
+ (
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ OPTIONS (
+ format = 'CSV',
+ uris = ['{{render(vars.gcs_file)}}'],
+ skip_leading_rows = 1,
+ ignore_unknown_values = TRUE
+ );
+
+ - id: bq_yellow_table_tmp
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
+ AS
+ SELECT
+ MD5(CONCAT(
+ COALESCE(CAST(VendorID AS STRING), ""),
+ COALESCE(CAST(tpep_pickup_datetime AS STRING), ""),
+ COALESCE(CAST(tpep_dropoff_datetime AS STRING), ""),
+ COALESCE(CAST(PULocationID AS STRING), ""),
+ COALESCE(CAST(DOLocationID AS STRING), "")
+ )) AS unique_row_id,
+ "{{render(vars.file)}}" AS filename,
+ *
+ FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
+
+ - id: bq_yellow_merge
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata` T
+ USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge)
+ VALUES (S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime, S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID, S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.improvement_surcharge, S.total_amount, S.congestion_surcharge);
+
+ - id: if_green_taxi
+ type: io.kestra.plugin.core.flow.If
+ condition: "{{inputs.taxi == 'green'}}"
+ then:
+ - id: bq_green_tripdata
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata`
+ (
+ unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
+ filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ ehail_fee NUMERIC,
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ PARTITION BY DATE(lpep_pickup_datetime);
+
+ - id: bq_green_table_ext
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
+ (
+ VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
+ lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
+ lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
+ store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
+ RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
+ PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
+ DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
+ passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
+ trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
+ fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
+ extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
+ mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
+ tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
+ tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
+ ehail_fee NUMERIC,
+ improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
+ total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
+ payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
+ trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
+ congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
+ )
+ OPTIONS (
+ format = 'CSV',
+ uris = ['{{render(vars.gcs_file)}}'],
+ skip_leading_rows = 1,
+ ignore_unknown_values = TRUE
+ );
+
+ - id: bq_green_table_tmp
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
+ AS
+ SELECT
+ MD5(CONCAT(
+ COALESCE(CAST(VendorID AS STRING), ""),
+ COALESCE(CAST(lpep_pickup_datetime AS STRING), ""),
+ COALESCE(CAST(lpep_dropoff_datetime AS STRING), ""),
+ COALESCE(CAST(PULocationID AS STRING), ""),
+ COALESCE(CAST(DOLocationID AS STRING), "")
+ )) AS unique_row_id,
+ "{{render(vars.file)}}" AS filename,
+ *
+ FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
+
+ - id: bq_green_merge
+ type: io.kestra.plugin.gcp.bigquery.Query
+ sql: |
+ MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata` T
+ USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
+ ON T.unique_row_id = S.unique_row_id
+ WHEN NOT MATCHED THEN
+ INSERT (unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge)
+ VALUES (S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime, S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count, S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee, S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge);
+
+ - id: purge_files
+ type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
+ description: To avoid cluttering your storage, we will remove the downloaded files
+
+pluginDefaults:
+ - type: io.kestra.plugin.gcp
+ values:
+ serviceAccount: "{{kv('GCP_CREDS')}}"
+ projectId: "{{kv('GCP_PROJECT_ID')}}"
+ location: "{{kv('GCP_LOCATION')}}"
+ bucket: "{{kv('GCP_BUCKET_NAME')}}"
+
+triggers:
+ - id: green_schedule
+ type: io.kestra.plugin.core.trigger.Schedule
+ cron: "0 9 1 * *"
+ inputs:
+ taxi: green
+
+ - id: yellow_schedule
+ type: io.kestra.plugin.core.trigger.Schedule
+ cron: "0 10 1 * *"
+ inputs:
+ taxi: yellow
+
diff --git a/02-workflow-orchestration/flows/10_chat_without_rag.yaml b/02-workflow-orchestration/flows/10_chat_without_rag.yaml
new file mode 100644
index 000000000..369307fb7
--- /dev/null
+++ b/02-workflow-orchestration/flows/10_chat_without_rag.yaml
@@ -0,0 +1,37 @@
+id: 06_chat_without_rag
+namespace: zoomcamp
+
+description: |
+ This flow demonstrates what happens when you query an LLM WITHOUT RAG.
+ The model can only rely on its training data, which may be outdated or incomplete.
+
+ After running this, check out 07_chat_with_rag.yaml to see how RAG fixes these issues.
+
+tasks:
+ - id: chat_without_rag
+ type: io.kestra.plugin.ai.completion.ChatCompletion
+ description: Query about Kestra 1.1 features WITHOUT RAG
+ provider:
+ type: io.kestra.plugin.ai.provider.GoogleGemini
+ modelName: gemini-2.5-flash
+ apiKey: "{{ kv('GEMINI_API_KEY') }}"
+ messages:
+ - type: USER
+ content: |
+ Which features were released in Kestra 1.1?
+ Please list at least 5 major features with brief descriptions.
+
+ - id: log_results
+ type: io.kestra.plugin.core.log.Log
+ message: |
+ ❌ Response WITHOUT RAG (no retrieved context):
+ {{ outputs.chat_without_rag.textOutput }}
+
+ 🤔 Did you notice that this response seems to be:
+ - Incorrect
+ - Vague/generic
+ - Listing features that haven't been added in exactly this version but rather a long time ago
+
+ 👉 This is why context matters. Run `07_chat_with_rag.yaml` to see the accurate, context-grounded response.
+
+
diff --git a/02-workflow-orchestration/flows/11_chat_with_rag.yaml b/02-workflow-orchestration/flows/11_chat_with_rag.yaml
new file mode 100644
index 000000000..14675a65e
--- /dev/null
+++ b/02-workflow-orchestration/flows/11_chat_with_rag.yaml
@@ -0,0 +1,52 @@
+id: 07_chat_with_rag
+namespace: zoomcamp
+
+description: |
+ This flow demonstrates RAG (Retrieval Augmented Generation) by ingesting Kestra release documentation and using it to answer questions accurately.
+
+ Compare this with 06_chat_without_rag.yaml to see the difference RAG makes.
+
+tasks:
+ - id: ingest_release_notes
+ type: io.kestra.plugin.ai.rag.IngestDocument
+ description: Ingest Kestra 1.1 release notes to create embeddings
+ provider:
+ type: io.kestra.plugin.ai.provider.GoogleGemini
+ modelName: gemini-embedding-001
+ apiKey: "{{ kv('GEMINI_API_KEY') }}"
+ embeddings:
+ type: io.kestra.plugin.ai.embeddings.KestraKVStore
+ drop: true
+ fromExternalURLs:
+ - https://raw.githubusercontent.com/kestra-io/docs/refs/heads/main/content/blogs/release-1-1.md
+
+ - id: chat_with_rag
+ type: io.kestra.plugin.ai.rag.ChatCompletion
+ description: Query about Kestra 1.1 features with RAG context
+ chatProvider:
+ type: io.kestra.plugin.ai.provider.GoogleGemini
+ modelName: gemini-2.5-flash
+ apiKey: "{{ kv('GEMINI_API_KEY') }}"
+ embeddingProvider:
+ type: io.kestra.plugin.ai.provider.GoogleGemini
+ modelName: gemini-embedding-001
+ apiKey: "{{ kv('GEMINI_API_KEY') }}"
+ embeddings:
+ type: io.kestra.plugin.ai.embeddings.KestraKVStore
+ systemMessage: |
+ You are a helpful assistant that answers questions about Kestra.
+ Use the provided documentation to give accurate, specific answers.
+ If you don't find the information in the context, say so.
+ prompt: |
+ Which features were released in Kestra 1.1?
+ Please list at least 5 major features with brief descriptions.
+
+ - id: log_results
+ type: io.kestra.plugin.core.log.Log
+ message: |
+ ✅ RAG Response (with retrieved context):
+ {{ outputs.chat_with_rag.textOutput }}
+
+ Note that this response is detailed, accurate, and grounded in the actual release documentation. Compare this with the output from 06_chat_without_rag.yaml.
+
+
diff --git a/cohorts/2025/02-workflow-orchestration/README.md b/cohorts/2025/02-workflow-orchestration/README.md
new file mode 100644
index 000000000..26d5d8a08
--- /dev/null
+++ b/cohorts/2025/02-workflow-orchestration/README.md
@@ -0,0 +1,451 @@
+# Workflow Orchestration
+
+Welcome to Module 2 of the Data Engineering Zoomcamp! This week, we’ll dive into workflow orchestration using [Kestra](https://go.kestra.io/de-zoomcamp/github).
+
+Kestra is an open-source, event-driven orchestration platform that simplifies building both scheduled and event-driven workflows. By adopting Infrastructure as Code practices for data and process orchestration, Kestra enables you to build reliable workflows with just a few lines of YAML.
+
+> [!NOTE]
+>You can find all videos for this week in this [YouTube Playlist](https://go.kestra.io/de-zoomcamp/yt-playlist).
+
+---
+
+# Course Structure
+
+## 1. Conceptual Material: Introduction to Orchestration and Kestra
+
+In this section, you’ll learn the foundations of workflow orchestration, its importance, and how Kestra fits into the orchestration landscape.
+
+### Videos
+- **2.2.1 - Introduction to Workflow Orchestration**
+ [](https://youtu.be/Np6QmmcgLCs)
+
+- **2.2.2 - Learn the Concepts of Kestra**
+ [](https://youtu.be/o79n-EVpics)
+
+### Resources
+- [Quickstart Guide](https://go.kestra.io/de-zoomcamp/quickstart)
+- [Install Kestra with Docker Compose](https://go.kestra.io/de-zoomcamp/docker-compose)
+- [Tutorial](https://go.kestra.io/de-zoomcamp/tutorial)
+- [What is an Orchestrator?](https://go.kestra.io/de-zoomcamp/what-is-an-orchestrator)
+
+---
+
+## 2. Hands-On Coding Project: Build Data Pipelines with Kestra
+
+This week, we're gonna build ETL pipelines for Yellow and Green Taxi data from NYC’s Taxi and Limousine Commission (TLC). You will:
+1. Extract data from [CSV files](https://github.com/DataTalksClub/nyc-tlc-data/releases).
+2. Load it into Postgres or Google Cloud (GCS + BigQuery).
+3. Explore scheduling and backfilling workflows.
+
+>[!NOTE]
+If you’re using the PostgreSQL and PgAdmin docker setup from Module 1 for this week’s Kestra Workflow Orchestration exercise, ensure your PostgreSQL image version is 15 or later (preferably the latest). The MERGE statement, introduced in PostgreSQL 15, won’t work on earlier versions and will likely cause syntax errors in your kestra flows.
+
+### File Structure
+
+The project is organized as follows:
+```
+.
+├── flows/
+│ ├── 01_getting_started_data_pipeline.yaml
+│ ├── 02_postgres_taxi.yaml
+│ ├── 02_postgres_taxi_scheduled.yaml
+│ ├── 03_postgres_dbt.yaml
+│ ├── 04_gcp_kv.yaml
+│ ├── 05_gcp_setup.yaml
+│ ├── 06_gcp_taxi.yaml
+│ ├── 06_gcp_taxi_scheduled.yaml
+│ └── 07_gcp_dbt.yaml
+```
+
+### Setup Kestra
+
+We'll set up Kestra using Docker Compose containing one container for the Kestra server and another for the Postgres database:
+
+```bash
+cd 02-workflow-orchestration/docker/combined
+docker compose up -d
+```
+
+Once the container starts, you can access the Kestra UI at [http://localhost:8080](http://localhost:8080).
+
+If you prefer to add flows programmatically using Kestra's API, run the following commands:
+
+```bash
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/01_getting_started_data_pipeline.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi_scheduled.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/03_postgres_dbt.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/04_gcp_kv.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/05_gcp_setup.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi_scheduled.yaml
+curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/07_gcp_dbt.yaml
+```
+
+---
+
+## 3. ETL Pipelines in Kestra: Detailed Walkthrough
+
+### Getting Started Pipeline
+
+This introductory flow is added just to demonstrate a simple data pipeline which extracts data via HTTP REST API, transforms that data in Python and then queries it using DuckDB. For this stage, a new separate Postgres database is created for the exercises.
+
+**Note:** Check that `pgAdmin` isn't running on the same ports as Kestra. If so, check out the [FAQ](#troubleshooting-tips) at the bottom of the README.
+
+### Videos
+
+- **2.2.3 - Create an ETL Pipeline with Postgres in Kestra**
+ [](https://youtu.be/OkfLX28Ecjg?si=vKbIyWo1TtjpNnvt)
+- **2.2.4 - Manage Scheduling and Backfills using Postgres in Kestra**
+ [](https://youtu.be/_-li_z97zog?si=G6jZbkfJb3GAyqrd)
+- **2.2.5 - Transform Data with dbt and Postgres in Kestra**
+ [](https://youtu.be/ZLp2N6p2JjE?si=tWhcvq5w4lO8v1_p)
+
+
+```mermaid
+graph LR
+ Extract[Extract Data via HTTP REST API] --> Transform[Transform Data in Python]
+ Transform --> Query[Query Data with DuckDB]
+```
+
+Add the flow [`01_getting_started_data_pipeline.yaml`](flows/01_getting_started_data_pipeline.yaml) from the UI if you haven't already and execute it to see the results. Inspect the Gantt and Logs tabs to understand the flow execution.
+
+### Local DB: Load Taxi Data to Postgres
+
+Before we start loading data to GCP, we'll first play with the Yellow and Green Taxi data using a local Postgres database running in a Docker container. We'll create a new Postgres database for these examples using this [Docker Compose file](docker/postgres/docker-compose.yml). Download it into a new directory, navigate to it and run the following command to start it:
+
+```bash
+docker compose up -d
+```
+
+The flow will extract CSV data partitioned by year and month, create tables, load data to the monthly table, and finally merge the data to the final destination table.
+
+```mermaid
+graph LR
+ Start[Select Year & Month] --> SetLabel[Set Labels]
+ SetLabel --> Extract[Extract CSV Data]
+ Extract -->|Taxi=Yellow| YellowFinalTable[Create Yellow Final Table]:::yellow
+ Extract -->|Taxi=Green| GreenFinalTable[Create Green Final Table]:::green
+ YellowFinalTable --> YellowMonthlyTable[Create Yellow Monthly Table]:::yellow
+ GreenFinalTable --> GreenMonthlyTable[Create Green Monthly Table]:::green
+ YellowMonthlyTable --> YellowCopyIn[Load Data to Monthly Table]:::yellow
+ GreenMonthlyTable --> GreenCopyIn[Load Data to Monthly Table]:::green
+ YellowCopyIn --> YellowMerge[Merge Yellow Data]:::yellow
+ GreenCopyIn --> GreenMerge[Merge Green Data]:::green
+
+ classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px;
+ classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
+```
+
+The flow code: [`02_postgres_taxi.yaml`](flows/02_postgres_taxi.yaml).
+
+
+> [!NOTE]
+> The NYC Taxi and Limousine Commission (TLC) Trip Record Data provided on the [nyc.gov](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) website is currently available only in a Parquet format, but this is NOT the dataset we're going to use in this course. For the purpose of this course, we'll use the **CSV files** available [here on GitHub](https://github.com/DataTalksClub/nyc-tlc-data/releases). This is because the Parquet format can be challenging to understand by newcomers, and we want to make the course as accessible as possible — the CSV format can be easily introspected using tools like Excel or Google Sheets, or even a simple text editor.
+
+### Local DB: Learn Scheduling and Backfills
+
+We can now schedule the same pipeline shown above to run daily at 9 AM UTC. We'll also demonstrate how to backfill the data pipeline to run on historical data.
+
+Note: given the large dataset, we'll backfill only data for the green taxi dataset for the year 2019.
+
+The flow code: [`02_postgres_taxi_scheduled.yaml`](flows/02_postgres_taxi_scheduled.yaml).
+
+### Local DB: Orchestrate dbt Models (Optional)
+
+Now that we have raw data ingested into a local Postgres database, we can use dbt to transform the data into meaningful insights. The flow will sync the dbt models from Git to Kestra and run the `dbt build` command to build the models.
+
+```mermaid
+graph LR
+ Start[Select dbt command] --> Sync[Sync Namespace Files]
+ Sync --> DbtBuild[Run dbt CLI]
+```
+
+This gives you a quick showcase of dbt inside of Kestra so the homework tasks do not depend on it. The course will go into more detail of dbt in [Week 4](../04-analytics-engineering).
+
+The flow code: [`03_postgres_dbt.yaml`](flows/03_postgres_dbt.yaml).
+
+### Resources
+- [pgAdmin Download](https://www.pgadmin.org/download/)
+- [Postgres DB Docker Compose](docker/postgres/docker-compose.yml)
+
+---
+
+## 4. ETL Pipelines in Kestra: Google Cloud Platform
+
+Now that you've learned how to build ETL pipelines locally using Postgres, we are ready to move to the cloud. In this section, we'll load the same Yellow and Green Taxi data to Google Cloud Platform (GCP) using:
+1. Google Cloud Storage (GCS) as a data lake
+2. BigQuery as a data warehouse.
+
+### Videos
+
+- **2.2.6 - Create an ETL Pipeline with GCS and BigQuery in Kestra**
+ [](https://youtu.be/nKqjjLJ7YXs)
+- **2.2.7 - Manage Scheduling and Backfills using BigQuery in Kestra**
+ [](https://youtu.be/DoaZ5JWEkH0)
+- **2.2.8 - Transform Data with dbt and BigQuery in Kestra**
+ [](https://youtu.be/eF_EdV4A1Wk)
+
+### Setup Google Cloud Platform (GCP)
+
+Before we start loading data to GCP, we need to set up the Google Cloud Platform.
+
+First, adjust the following flow [`04_gcp_kv.yaml`](flows/04_gcp_kv.yaml) to include your service account, GCP project ID, BigQuery dataset and GCS bucket name (_along with their location_) as KV Store values:
+- GCP_CREDS
+- GCP_PROJECT_ID
+- GCP_LOCATION
+- GCP_BUCKET_NAME
+- GCP_DATASET.
+
+
+> [!WARNING]
+> The `GCP_CREDS` service account contains sensitive information. Ensure you keep it secure and do not commit it to Git. Keep it as secure as your passwords.
+
+### Create GCP Resources
+
+If you haven't already created the GCS bucket and BigQuery dataset in the first week of the course, you can use this flow to create them: [`05_gcp_setup.yaml`](flows/05_gcp_setup.yaml).
+
+
+### GCP Workflow: Load Taxi Data to BigQuery
+
+```mermaid
+graph LR
+ SetLabel[Set Labels] --> Extract[Extract CSV Data]
+ Extract --> UploadToGCS[Upload Data to GCS]
+ UploadToGCS -->|Taxi=Yellow| BQYellowTripdata[Main Yellow Tripdata Table]:::yellow
+ UploadToGCS -->|Taxi=Green| BQGreenTripdata[Main Green Tripdata Table]:::green
+ BQYellowTripdata --> BQYellowTableExt[External Table]:::yellow
+ BQGreenTripdata --> BQGreenTableExt[External Table]:::green
+ BQYellowTableExt --> BQYellowTableTmp[Monthly Table]:::yellow
+ BQGreenTableExt --> BQGreenTableTmp[Monthly Table]:::green
+ BQYellowTableTmp --> BQYellowMerge[Merge to Main Table]:::yellow
+ BQGreenTableTmp --> BQGreenMerge[Merge to Main Table]:::green
+ BQYellowMerge --> PurgeFiles[Purge Files]
+ BQGreenMerge --> PurgeFiles[Purge Files]
+
+ classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px;
+ classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
+```
+
+The flow code: [`06_gcp_taxi.yaml`](flows/06_gcp_taxi.yaml).
+
+### GCP Workflow: Schedule and Backfill Full Dataset
+
+We can now schedule the same pipeline shown above to run daily at 9 AM UTC for the green dataset and at 10 AM UTC for the yellow dataset. You can backfill historical data directly from the Kestra UI.
+
+Since we now process data in a cloud environment with infinitely scalable storage and compute, we can backfill the entire dataset for both the yellow and green taxi data without the risk of running out of resources on our local machine.
+
+The flow code: [`06_gcp_taxi_scheduled.yaml`](flows/06_gcp_taxi_scheduled.yaml).
+
+### GCP Workflow: Orchestrate dbt Models (Optional)
+
+Now that we have raw data ingested into BigQuery, we can use dbt to transform that data. The flow will sync the dbt models from Git to Kestra and run the `dbt build` command to build the models:
+
+```mermaid
+graph LR
+ Start[Select dbt command] --> Sync[Sync Namespace Files]
+ Sync --> Build[Run dbt Build Command]
+```
+
+This gives you a quick showcase of dbt inside of Kestra so the homework tasks do not depend on it. The course will go into more detail of dbt in [Week 4](../04-analytics-engineering).
+
+The flow code: [`07_gcp_dbt.yaml`](flows/07_gcp_dbt.yaml).
+
+---
+
+## 5. Bonus: Deploy to the Cloud (Optional)
+
+Now that we've got our ETL pipeline working both locally and in the cloud, we can deploy Kestra to the cloud so it can continue to orchestrate our ETL pipelines monthly with our configured schedules, We'll cover how you can install Kestra on Google Cloud in Production, and automatically sync and deploy your workflows from a Git repository.
+
+Note: When committing your workflows to Kestra, make sure your workflow doesn't contain any sensitive information. You can use [Secrets](https://go.kestra.io/de-zoomcamp/secret) and the [KV Store](https://go.kestra.io/de-zoomcamp/kv-store) to keep sensitive data out of your workflow logic.
+
+### Videos
+
+- **2.2.9 - Deploy Workflows to the Cloud with Git**
+ [](https://youtu.be/l-wC71tI3co)
+
+Resources
+
+- [Install Kestra on Google Cloud](https://go.kestra.io/de-zoomcamp/gcp-install)
+- [Moving from Development to Production](https://go.kestra.io/de-zoomcamp/dev-to-prod)
+- [Using Git in Kestra](https://go.kestra.io/de-zoomcamp/git)
+- [Deploy Flows with GitHub Actions](https://go.kestra.io/de-zoomcamp/deploy-github-actions)
+
+## 6. Additional Resources 📚
+
+- Check [Kestra Docs](https://go.kestra.io/de-zoomcamp/docs)
+- Explore our [Blueprints](https://go.kestra.io/de-zoomcamp/blueprints) library
+- Browse over 600 [plugins](https://go.kestra.io/de-zoomcamp/plugins) available in Kestra
+- Give us a star on [GitHub](https://go.kestra.io/de-zoomcamp/github)
+- Join our [Slack community](https://go.kestra.io/de-zoomcamp/slack) if you have any questions
+- Find all the videos in this [YouTube Playlist](https://go.kestra.io/de-zoomcamp/yt-playlist)
+
+
+### Troubleshooting tips
+
+If you face any issues with Kestra flows in Module 2, make sure to use the following Docker images/ports:
+- `kestra/kestra:latest` is correct = latest stable release, while `kestra/kestra:develop` is incorrect as this is a bleeding-edge development version that might contain bugs
+- `postgres:latest` — make sure to use Postgres image, which uses **PostgreSQL 15** or higher
+- If you run `pgAdmin` or something else on port 8080, you can adjust Kestra docker-compose to use a different port, e.g. change port mapping to 18080 instead of 8080, and then access Kestra UI in your browser from http://localhost:18080/ instead of from http://localhost:8080/
+
+If you're using Linux, you might encounter `Connection Refused` errors when connecting to the Postgres DB from within Kestra. This is because `host.docker.internal` works differently on Linux. Using the modified Docker Compose file below, you can run both Kestra and its dedicated Postgres DB, as well as the Postgres DB for the exercises all together. You can access it within Kestra by referring to the container name `postgres_zoomcamp` instead of `host.docker.internal` in `pluginDefaults`. This applies to pgAdmin as well. If you'd prefer to keep it in separate Docker Compose files, you'll need to setup a Docker network so that they can communicate with each other.
+
+
+Docker Compose Example
+
+This Docker Compose has the Zoomcamp DB container and pgAdmin container added to it, so it's all in one file.
+
+Changes include:
+- New `volume` for the Zoomcamp DB container
+- Zoomcamp DB container is added and renamed to prevent clashes with the Kestra DB container
+- Depends on condition is added to make sure Kestra is running before it starts
+- pgAdmin is added and running on Port 8085 so it doesn't clash wit Kestra which uses 8080 and 8081
+
+```yaml
+volumes:
+ postgres-data:
+ driver: local
+ kestra-data:
+ driver: local
+ zoomcamp-data:
+ driver: local
+
+services:
+ postgres:
+ image: postgres
+ volumes:
+ - postgres-data:/var/lib/postgresql/data
+ environment:
+ POSTGRES_DB: kestra
+ POSTGRES_USER: kestra
+ POSTGRES_PASSWORD: k3str4
+ healthcheck:
+ test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
+ interval: 30s
+ timeout: 10s
+ retries: 10
+
+ kestra:
+ image: kestra/kestra:latest
+ pull_policy: always
+ # Note that this setup with a root user is intended for development purpose.
+ # Our base image runs without root, but the Docker Compose implementation needs root to access the Docker socket
+ # To run Kestra in a rootless mode in production, see: https://kestra.io/docs/installation/podman-compose
+ user: "root"
+ command: server standalone
+ volumes:
+ - kestra-data:/app/storage
+ - /var/run/docker.sock:/var/run/docker.sock
+ - /tmp/kestra-wd:/tmp/kestra-wd
+ environment:
+ KESTRA_CONFIGURATION: |
+ datasources:
+ postgres:
+ url: jdbc:postgresql://postgres:5432/kestra
+ driverClassName: org.postgresql.Driver
+ username: kestra
+ password: k3str4
+ kestra:
+ server:
+ basicAuth:
+ enabled: false
+ username: "admin@kestra.io" # it must be a valid email address
+ password: kestra
+ repository:
+ type: postgres
+ storage:
+ type: local
+ local:
+ basePath: "/app/storage"
+ queue:
+ type: postgres
+ tasks:
+ tmpDir:
+ path: /tmp/kestra-wd/tmp
+ url: http://localhost:8080/
+ ports:
+ - "8080:8080"
+ - "8081:8081"
+ depends_on:
+ postgres:
+ condition: service_started
+
+ postgres_zoomcamp:
+ image: postgres
+ environment:
+ POSTGRES_USER: kestra
+ POSTGRES_PASSWORD: k3str4
+ POSTGRES_DB: postgres-zoomcamp
+ ports:
+ - "5432:5432"
+ volumes:
+ - zoomcamp-data:/var/lib/postgresql/data
+ depends_on:
+ kestra:
+ condition: service_started
+
+ pgadmin:
+ image: dpage/pgadmin4
+ environment:
+ - PGADMIN_DEFAULT_EMAIL=admin@admin.com
+ - PGADMIN_DEFAULT_PASSWORD=root
+ ports:
+ - "8085:80"
+ depends_on:
+ postgres_zoomcamp:
+ condition: service_started
+```
+
+
+
+If you are still facing any issues, stop and remove your existing Kestra + Postgres containers and start them again using `docker-compose up -d`. If this doesn't help, post your question on the DataTalksClub Slack or on Kestra's Slack http://kestra.io/slack.
+
+- **DE Zoomcamp FAQ - PostgresDB Setup and Installing pgAdmin**
+ [](https://youtu.be/ywAPYNYFaB4?si=5X9AD0nFAT2WLWgS)
+- **DE Zoomcamp FAQ - Port and Images**
+ [](https://youtu.be/l2M2mW76RIU?si=oqyZ7KUaI27vi90V)
+- **DE Zoomcamp FAQ - Docker Setup**
+ [](https://youtu.be/73g6qJN0HcM)
+
+
+
+If you encounter similar errors to:
+```
+BigQueryError{reason=invalid, location=null,
+message=Error while reading table: kestra-sandbox.zooomcamp.yellow_tripdata_2020_01,
+error message: CSV table references column position 17, but line contains only 14 columns.;
+line_number: 2103925 byte_offset_to_start_of_line: 194863028
+column_index: 17 column_name: "congestion_surcharge" column_type: NUMERIC
+File: gs://anna-geller/yellow_tripdata_2020-01.csv}
+```
+
+It means that the CSV file you're trying to load into BigQuery has a mismatch in the number of columns between the external source table (i.e. file in GCS) and the destination table in BigQuery. This can happen when for due to network/transfer issues, the file is not fully downloaded from GitHub or not correctly uploaded to GCS. The error suggests schema issues but that's not the case. Simply rerun the entire execution including redownloading the CSV file and reuploading it to GCS. This should resolve the issue.
+
+---
+
+## Homework
+
+See the [2025 cohort folder](../cohorts/2025/02-workflow-orchestration/homework.md)
+
+
+---
+
+# Community notes
+
+Did you take notes? You can share them by creating a PR to this file!
+
+* [Notes from Manuel Guerra)](https://github.com/ManuelGuerra1987/data-engineering-zoomcamp-notes/blob/main/2_Workflow-Orchestration-(Kestra)/README.md)
+* [Notes from Horeb Seidou](https://spotted-hardhat-eea.notion.site/Week-2-Workflow-Orchestration-17129780dc4a80148debf61e6453fffe)
+* [Notes from Livia](https://docs.google.com/document/d/1Y_QMonvEtFPbXIzmdpCSVsKNC1BWAHFBA1mpK9qaZko/edit?usp=sharing)
+* [2025 Gitbook Notes from Tinker0425](https://data-engineering-zoomcamp-2025-t.gitbook.io/tinker0425/module-2/introduction-to-module-2)
+* [Notes from Mercy Markus: Linux/Fedora Tweaks and Tips](https://mercymarkus.com/posts/2025/series/dtc-dez-jan-2025/dtc-dez-2025-module-2/)
+* Add your notes above this line
+
+---
+
+# Previous Cohorts
+
+* 2022: [notes](../cohorts/2022/week_2_data_ingestion#community-notes) and [videos](../cohorts/2022/week_2_data_ingestion)
+* 2023: [notes](../cohorts/2023/week_2_workflow_orchestration#community-notes) and [videos](../cohorts/2023/week_2_workflow_orchestration)
+* 2024: [notes](../cohorts/2024/02-workflow-orchestration#community-notes) and [videos](../cohorts/2024/02-workflow-orchestration)
+
diff --git a/cohorts/2025/02-workflow-orchestration/flows/01_getting_started_data_pipeline.yaml b/cohorts/2025/02-workflow-orchestration/flows/01_getting_started_data_pipeline.yaml
new file mode 100644
index 000000000..e2f04341b
--- /dev/null
+++ b/cohorts/2025/02-workflow-orchestration/flows/01_getting_started_data_pipeline.yaml
@@ -0,0 +1,55 @@
+id: 01_getting_started_data_pipeline
+namespace: zoomcamp
+
+inputs:
+ - id: columns_to_keep
+ type: ARRAY
+ itemType: STRING
+ defaults:
+ - brand
+ - price
+
+tasks:
+ - id: extract
+ type: io.kestra.plugin.core.http.Download
+ uri: https://dummyjson.com/products
+
+ - id: transform
+ type: io.kestra.plugin.scripts.python.Script
+ containerImage: python:3.11-alpine
+ inputFiles:
+ data.json: "{{outputs.extract.uri}}"
+ outputFiles:
+ - "*.json"
+ env:
+ COLUMNS_TO_KEEP: "{{inputs.columns_to_keep}}"
+ script: |
+ import json
+ import os
+
+ columns_to_keep_str = os.getenv("COLUMNS_TO_KEEP")
+ columns_to_keep = json.loads(columns_to_keep_str)
+
+ with open("data.json", "r") as file:
+ data = json.load(file)
+
+ filtered_data = [
+ {column: product.get(column, "N/A") for column in columns_to_keep}
+ for product in data["products"]
+ ]
+
+ with open("products.json", "w") as file:
+ json.dump(filtered_data, file, indent=4)
+
+ - id: query
+ type: io.kestra.plugin.jdbc.duckdb.Query
+ inputFiles:
+ products.json: "{{outputs.transform.outputFiles['products.json']}}"
+ sql: |
+ INSTALL json;
+ LOAD json;
+ SELECT brand, round(avg(price), 2) as avg_price
+ FROM read_json_auto('{{workingDir}}/products.json')
+ GROUP BY brand
+ ORDER BY avg_price DESC;
+ fetchType: STORE
diff --git a/02-workflow-orchestration/flows/02_postgres_taxi.yaml b/cohorts/2025/02-workflow-orchestration/flows/02_postgres_taxi.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/02_postgres_taxi.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/02_postgres_taxi.yaml
diff --git a/02-workflow-orchestration/flows/02_postgres_taxi_scheduled.yaml b/cohorts/2025/02-workflow-orchestration/flows/02_postgres_taxi_scheduled.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/02_postgres_taxi_scheduled.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/02_postgres_taxi_scheduled.yaml
diff --git a/02-workflow-orchestration/flows/03_postgres_dbt.yaml b/cohorts/2025/02-workflow-orchestration/flows/03_postgres_dbt.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/03_postgres_dbt.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/03_postgres_dbt.yaml
diff --git a/02-workflow-orchestration/flows/04_gcp_kv.yaml b/cohorts/2025/02-workflow-orchestration/flows/04_gcp_kv.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/04_gcp_kv.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/04_gcp_kv.yaml
diff --git a/02-workflow-orchestration/flows/05_gcp_setup.yaml b/cohorts/2025/02-workflow-orchestration/flows/05_gcp_setup.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/05_gcp_setup.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/05_gcp_setup.yaml
diff --git a/02-workflow-orchestration/flows/06_gcp_taxi.yaml b/cohorts/2025/02-workflow-orchestration/flows/06_gcp_taxi.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/06_gcp_taxi.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/06_gcp_taxi.yaml
diff --git a/02-workflow-orchestration/flows/06_gcp_taxi_scheduled.yaml b/cohorts/2025/02-workflow-orchestration/flows/06_gcp_taxi_scheduled.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/06_gcp_taxi_scheduled.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/06_gcp_taxi_scheduled.yaml
diff --git a/02-workflow-orchestration/flows/07_gcp_dbt.yaml b/cohorts/2025/02-workflow-orchestration/flows/07_gcp_dbt.yaml
similarity index 100%
rename from 02-workflow-orchestration/flows/07_gcp_dbt.yaml
rename to cohorts/2025/02-workflow-orchestration/flows/07_gcp_dbt.yaml
diff --git a/cohorts/2025/02-workflow-orchestration/images/homework.png b/cohorts/2025/02-workflow-orchestration/images/homework.png
new file mode 100644
index 000000000..f2832c32f
Binary files /dev/null and b/cohorts/2025/02-workflow-orchestration/images/homework.png differ
diff --git a/cohorts/2025/02-workflow-orchestration/solution.md b/cohorts/2025/02-workflow-orchestration/solution.md
deleted file mode 100644
index da67579e7..000000000
--- a/cohorts/2025/02-workflow-orchestration/solution.md
+++ /dev/null
@@ -1,50 +0,0 @@
-## Question 1
-
-```
-Within the execution for Yellow Taxi data for the year 2020 and month 12: what is the uncompressed file size (i.e. the output file yellow_tripdata_2020-12.csv of the extract task)?
-```
-
-To get this answer, you need to go to the Outputs tab in Kestra and select the file. The size will be next to the preview and download button.
-Answer: `128.3 MB`
-
-## Question 2
-
-```
-What is the rendered value of the variable file when the inputs taxi is set to green, year is set to 2020, and month is set to 04 during execution?
-```
-
-To get this answer, you can run the expression in [Debug Outputs](https://youtu.be/SPGmXSJN3VE) to see it rendered.
-
-Answer: `green_tripdata_2020-04.csv`
-
-## Question 3
-
-```
-How many rows are there for the Yellow Taxi data for all CSV files in the year 2020?
-```
-
-Answer: `24,648,499`
-
-## Question 4
-
-```
-How many rows are there for the Green Taxi data for all CSV files in the year 2020?
-```
-
-Answer: `1,734,051`
-
-## Question 5
-
-```
-How many rows are there for the Yellow Taxi data for the March 2021 CSV file?
-```
-
-Answer: `1,925,152`
-
-## Question 6
-
-```
-How would you configure the timezone to New York in a Schedule trigger?
-```
-
-Answer: `Add a timezone property set to America/New_York in the Schedule trigger configuration`
diff --git a/cohorts/2026/02-workflow-orchestration/homework.md b/cohorts/2026/02-workflow-orchestration/homework.md
new file mode 100644
index 000000000..b8aa9ed06
--- /dev/null
+++ b/cohorts/2026/02-workflow-orchestration/homework.md
@@ -0,0 +1,72 @@
+## Module 2 Homework
+
+ATTENTION: At the end of the submission form, you will be required to include a link to your GitHub repository or other public code-hosting site. This repository should contain your code for solving the homework. If your solution includes code that is not in file format, please include these directly in the README file of your repository.
+
+> In case you don't get one option exactly, select the closest one
+
+For the homework, we'll be working with the _green_ taxi dataset located here:
+
+`https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/green/download`
+
+To get a `wget`-able link, use this prefix (note that the link itself gives 404):
+
+`https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/`
+
+### Assignment
+
+So far in the course, we processed data for the year 2019 and 2020. Your task is to extend the existing flows to include data for the year 2021.
+
+
+
+As a hint, Kestra makes that process really easy:
+1. You can leverage the backfill functionality in the [scheduled flow](../../../02-workflow-orchestration/flows/05_gcp_taxi_scheduled.yaml) to backfill the data for the year 2021. Just make sure to select the time period for which data exists i.e. from `2021-01-01` to `2021-07-31`. Also, make sure to do the same for both `yellow` and `green` taxi data (select the right service in the `taxi` input).
+2. Alternatively, run the flow manually for each of the seven months of 2021 for both `yellow` and `green` taxi data. Challenge for you: find out how to loop over the combination of Year-Month and `taxi`-type using `ForEach` task which triggers the flow for each combination using a `Subflow` task.
+
+### Quiz Questions
+
+Complete the quiz shown below. It's a set of 6 multiple-choice questions to test your understanding of workflow orchestration, Kestra, and ETL pipelines.
+
+1) Within the execution for `Yellow` Taxi data for the year `2020` and month `12`: what is the uncompressed file size (i.e. the output file `yellow_tripdata_2020-12.csv` of the `extract` task)?
+- 128.3 MiB
+- 134.5 MiB
+- 364.7 MiB
+- 692.6 MiB
+
+2) What is the rendered value of the variable `file` when the inputs `taxi` is set to `green`, `year` is set to `2020`, and `month` is set to `04` during execution?
+- `{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv`
+- `green_tripdata_2020-04.csv`
+- `green_tripdata_04_2020.csv`
+- `green_tripdata_2020.csv`
+
+3) How many rows are there for the `Yellow` Taxi data for all CSV files in the year 2020?
+- 13,537.299
+- 24,648,499
+- 18,324,219
+- 29,430,127
+
+4) How many rows are there for the `Green` Taxi data for all CSV files in the year 2020?
+- 5,327,301
+- 936,199
+- 1,734,051
+- 1,342,034
+
+5) How many rows are there for the `Yellow` Taxi data for the March 2021 CSV file?
+- 1,428,092
+- 706,911
+- 1,925,152
+- 2,561,031
+
+6) How would you configure the timezone to New York in a Schedule trigger?
+- Add a `timezone` property set to `EST` in the `Schedule` trigger configuration
+- Add a `timezone` property set to `America/New_York` in the `Schedule` trigger configuration
+- Add a `timezone` property set to `UTC-5` in the `Schedule` trigger configuration
+- Add a `location` property set to `New_York` in the `Schedule` trigger configuration
+
+## Submitting the solutions
+
+* Form for submitting: https://courses.datatalks.club/de-zoomcamp-2025/homework/hw2
+* Check the link above to see the due date
+
+## Solution
+
+Will be added after the due date