diff --git a/samples/dbt/.dbt.yml b/samples/dbt/.dbt.yml new file mode 100644 index 0000000000..98053bfc37 --- /dev/null +++ b/samples/dbt/.dbt.yml @@ -0,0 +1,13 @@ +dbt_sample_project: + outputs: + dev: # The target environment name (e.g., dev, prod) + compute_region: us-central1 # Region used for compute operations + dataset: dbt_sample_dateset # BigQuery dataset where dbt will create models + gcs_bucket: dbt_sample_bucket # GCS bucket to store output files + location: US # BigQuery dataset location + method: oauth # Authentication method + priority: interactive # Job priority: "interactive" or "batch" + project: bigframes-dev # GCP project ID + threads: 1 # Number of threads dbt can use for running models in parallel + type: bigquery # Specifies the dbt adapter + target: dev # The default target environment diff --git a/samples/dbt/README.md b/samples/dbt/README.md new file mode 100644 index 0000000000..c52b633116 --- /dev/null +++ b/samples/dbt/README.md @@ -0,0 +1,62 @@ +# dbt BigFrames Integration + +This repository provides simple examples of using **dbt Python models** with **BigQuery** in **BigFrames** mode. + +It includes basic configurations and sample models to help you get started quickly in a typical dbt project. + +## Highlights + +- `profiles.yml`: configures your connection to BigQuery. +- `dbt_project.yml`: configures your dbt project - **dbt_sample_project**. +- `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation. +- `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities. + +## Requirements + +Before using this project, ensure you have: + +- A [Google Cloud account](https://cloud.google.com/free?hl=en) +- A [dbt Cloud account](https://www.getdbt.com/signup) (if using dbt Cloud) +- Python and SQL basics +- Familiarity with dbt concepts and structure + +For more, see: +- https://docs.getdbt.com/guides/dbt-python-bigframes +- https://cloud.google.com/bigquery/docs/dataframes-dbt + +## Run Locally + +Follow these steps to run the Python models using dbt Core. + +1. **Install the dbt BigQuery adapter:** + + ```bash + pip install dbt-bigquery + ``` + +2. **Initialize a dbt project (if not already done):** + + ```bash + dbt init + ``` + + Follow the prompts to complete setup. + +3. **Finish the configuration and add sample code:** + + - Edit `~/.dbt/profiles.yml` to finish the configuration. + - Replace or add code samples in `.../models/example`. + +4. **Run your dbt models:** + + To run all models: + + ```bash + dbt run + ``` + + Or run a specific model: + + ```bash + dbt run --select your_model_name + ``` \ No newline at end of file diff --git a/samples/dbt/dbt_sample_project/dbt_project.yml b/samples/dbt/dbt_sample_project/dbt_project.yml new file mode 100644 index 0000000000..d12098a18a --- /dev/null +++ b/samples/dbt/dbt_sample_project/dbt_project.yml @@ -0,0 +1,39 @@ + +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: 'dbt_sample_project' +version: '1.0.0' + +# This setting configures which "profile" dbt uses for this project. +profile: 'dbt_sample_project' + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ +# directory as views. These settings can be overridden in the individual model +# files using the `{{ config(...) }}` macro. +models: + dbt_sample_project: + # Optional: These settings (e.g., submission_method, notebook_template_id, + # etc.) can also be defined directly in the Python model using dbt.config. + submission_method: bigframes + # Config indicated by + and applies to all files under models/example/ + example: + +materialized: view diff --git a/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_1.py b/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_1.py new file mode 100644 index 0000000000..4c8ddf8f6c --- /dev/null +++ b/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_1.py @@ -0,0 +1,58 @@ +# This example demonstrates one of the most general usages of transforming raw +# BigQuery data into a processed table using a dbt Python model with BigFrames. +# See more from: https://cloud.google.com/bigquery/docs/dataframes-dbt. +# +# Key defaults when using BigFrames in a dbt Python model for BigQuery: +# - The default materialization is 'table' unless specified otherwise. This +# means dbt will create a new BigQuery table from the result of this model. +# - The default timeout for the job is 3600 seconds (60 minutes). This can be +# adjusted if your processing requires more time. +# - If no runtime template is provided, dbt will automatically create and reuse +# a default one for executing the Python code in BigQuery. +# +# BigFrames provides a pandas-like API for BigQuery data, enabling familiar +# data manipulation directly within your dbt project. This code sample +# illustrates a basic pattern for: +# 1. Reading data from an existing BigQuery dataset. +# 2. Processing it using pandas-like DataFrame operations powered by BigFrames. +# 3. Outputting a cleaned and transformed table, managed by dbt. + + +def model(dbt, session): + # Optional: Override settings from your dbt_project.yml file. + # When both are set, dbt.config takes precedence over dbt_project.yml. + # + # Use `dbt.config(submission_method="bigframes")` to tell dbt to execute + # this Python model using BigQuery DataFrames (BigFrames). This allows you + # to write pandas-like code that operates directly on BigQuery data + # without needing to pull all data into memory. + dbt.config(submission_method="bigframes") + + # Define the BigQuery table path from which to read data. + table = "bigquery-public-data.epa_historical_air_quality.temperature_hourly_summary" + + # Define the specific columns to select from the BigQuery table. + columns = ["state_name", "county_name", "date_local", "time_local", "sample_measurement"] + + # Read data from the specified BigQuery table into a BigFrames DataFrame. + df = session.read_gbq(table, columns=columns) + + # Sort the DataFrame by the specified columns. This prepares the data for + # `drop_duplicates` to ensure consistent duplicate removal. + df = df.sort_values(columns).drop_duplicates(columns) + + # Group the DataFrame by 'state_name', 'county_name', and 'date_local'. For + # each group, calculate the minimum and maximum of the 'sample_measurement' + # column. The result will be a BigFrames DataFrame with a MultiIndex. + result = df.groupby(["state_name", "county_name", "date_local"])["sample_measurement"]\ + .agg(["min", "max"]) + + # Rename some columns and convert the MultiIndex of the 'result' DataFrame + # into regular columns. This flattens the DataFrame so 'state_name', + # 'county_name', and 'date_local' become regular columns again. + result = result.rename(columns={'min': 'min_temperature', 'max': 'max_temperature'})\ + .reset_index() + + # Return the processed BigFrames DataFrame. + # In a dbt Python model, this DataFrame will be materialized as a table + return result diff --git a/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_2.py b/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_2.py new file mode 100644 index 0000000000..019e503393 --- /dev/null +++ b/samples/dbt/dbt_sample_project/models/example/dbt_bigframes_code_sample_2.py @@ -0,0 +1,67 @@ +# This example demonstrates how to build an **incremental dbt Python model** +# using BigFrames. +# +# Incremental models are essential for efficiently processing large datasets by +# only transforming new or changed data, rather than reprocessing the entire +# dataset every time. If the target table already exists, dbt will perform a +# merge based on the specified unique keys; otherwise, it will create a new +# table automatically. +# +# This model also showcases the definition and application of a **BigFrames +# User-Defined Function (UDF)** to add a descriptive summary column based on +# temperature data. BigFrames UDFs allow you to execute custom Python logic +# directly within BigQuery, leveraging BigQuery's scalability. + + +import bigframes.pandas as bpd + +def model(dbt, session): + # Optional: override settings from dbt_project.yml. + # When both are set, dbt.config takes precedence over dbt_project.yml. + dbt.config( + # Use BigFrames mode to execute this Python model. This enables + # pandas-like operations directly on BigQuery data. + submission_method="bigframes", + # Materialize this model as an 'incremental' table. This tells dbt to + # only process new or updated data on subsequent runs. + materialized='incremental', + # Use MERGE strategy to update rows during incremental runs. + incremental_strategy='merge', + # Define the composite key that uniquely identifies a row in the + # target table. This key is used by the 'merge' strategy to match + # existing rows for updates during incremental runs. + unique_key=["state_name", "county_name", "date_local"], + ) + + # Reference an upstream dbt model or an existing BigQuery table as a + # BigFrames DataFrame. It allows you to seamlessly use the output of another + # dbt model as input to this one. + df = dbt.ref("dbt_bigframes_code_sample_1") + + # Define a BigFrames UDF to generate a temperature description. + # BigFrames UDFs allow you to define custom Python logic that executes + # directly within BigQuery. This is powerful for complex transformations. + @bpd.udf(dataset='dbt_sample_dataset', name='describe_udf') + def describe( + max_temperature: float, + min_temperature: float, + ) -> str: + is_hot = max_temperature > 85.0 + is_cold = min_temperature < 50.0 + + if is_hot and is_cold: + return "Expect both hot and cold conditions today." + if is_hot: + return "Overall, it's a hot day." + if is_cold: + return "Overall, it's a cold day." + return "Comfortable throughout the day." + + # Apply the UDF using combine and store the result in a column "describe". + df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe) + + # Return the transformed BigFrames DataFrame. + # This DataFrame will be the final output of your incremental dbt model. + # On subsequent runs, only new or changed rows will be processed and merged + # into the target BigQuery table based on the `unique_key`. + return df