|
| 1 | +# Copyright 2025 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | + |
| 16 | +def run_udf_and_read_gbq_function( |
| 17 | + project_id: str, dataset_id: str, routine_id: str |
| 18 | +) -> None: |
| 19 | + your_gcp_project_id = project_id |
| 20 | + your_bq_dataset_id = dataset_id |
| 21 | + your_bq_routine_id = routine_id |
| 22 | + |
| 23 | + # [START bigquery_dataframes_udf] |
| 24 | + import bigframes.pandas as bpd |
| 25 | + |
| 26 | + # Set BigQuery DataFrames options |
| 27 | + bpd.options.bigquery.project = your_gcp_project_id |
| 28 | + bpd.options.bigquery.location = "US" |
| 29 | + |
| 30 | + # BigQuery DataFrames gives you the ability to turn your custom functions |
| 31 | + # into a BigQuery Python UDF. One can find more details about the usage and |
| 32 | + # the requirements via `help` command. |
| 33 | + help(bpd.udf) |
| 34 | + |
| 35 | + # Read a table and inspect the column of interest. |
| 36 | + df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") |
| 37 | + df["body_mass_g"].peek(10) |
| 38 | + |
| 39 | + # Define a custom function, and specify the intent to turn it into a |
| 40 | + # BigQuery Python UDF. Let's try a `pandas`-like use case in which we want |
| 41 | + # to apply a user defined function to every value in a `Series`, more |
| 42 | + # specifically bucketize the `body_mass_g` value of the penguins, which is a |
| 43 | + # real number, into a category, which is a string. |
| 44 | + @bpd.udf( |
| 45 | + dataset=your_bq_dataset_id, |
| 46 | + name=your_bq_routine_id, |
| 47 | + ) |
| 48 | + def get_bucket(num: float) -> str: |
| 49 | + if not num: |
| 50 | + return "NA" |
| 51 | + boundary = 4000 |
| 52 | + return "at_or_above_4000" if num >= boundary else "below_4000" |
| 53 | + |
| 54 | + # Then we can apply the udf on the `Series` of interest via |
| 55 | + # `apply` API and store the result in a new column in the DataFrame. |
| 56 | + df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket)) |
| 57 | + |
| 58 | + # This will add a new column `body_mass_bucket` in the DataFrame. You can |
| 59 | + # preview the original value and the bucketized value side by side. |
| 60 | + df[["body_mass_g", "body_mass_bucket"]].peek(10) |
| 61 | + |
| 62 | + # The above operation was possible by doing all the computation on the |
| 63 | + # cloud through an underlying BigQuery Python UDF that was created to |
| 64 | + # support the user's operations in the Python code. |
| 65 | + |
| 66 | + # The BigQuery Python UDF created to support the BigQuery DataFrames |
| 67 | + # udf can be located via a property `bigframes_bigquery_function` |
| 68 | + # set in the udf object. |
| 69 | + print(f"Created BQ Python UDF: {get_bucket.bigframes_bigquery_function}") |
| 70 | + |
| 71 | + # If you have already defined a custom function in BigQuery, either via the |
| 72 | + # BigQuery Google Cloud Console or with the `udf` decorator, |
| 73 | + # or otherwise, you may use it with BigQuery DataFrames with the |
| 74 | + # `read_gbq_function` method. More details are available via the `help` |
| 75 | + # command. |
| 76 | + help(bpd.read_gbq_function) |
| 77 | + |
| 78 | + existing_get_bucket_bq_udf = get_bucket.bigframes_bigquery_function |
| 79 | + |
| 80 | + # Here is an example of using `read_gbq_function` to load an existing |
| 81 | + # BigQuery Python UDF. |
| 82 | + df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") |
| 83 | + get_bucket_function = bpd.read_gbq_function(existing_get_bucket_bq_udf) |
| 84 | + |
| 85 | + df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket_function)) |
| 86 | + df.peek(10) |
| 87 | + |
| 88 | + # Let's continue trying other potential use cases of udf. Let's say we |
| 89 | + # consider the `species`, `island` and `sex` of the penguins sensitive |
| 90 | + # information and want to redact that by replacing with their hash code |
| 91 | + # instead. Let's define another scalar custom function and decorate it |
| 92 | + # as a udf. The custom function in this example has external package |
| 93 | + # dependency, which can be specified via `packages` parameter. |
| 94 | + @bpd.udf( |
| 95 | + dataset=your_bq_dataset_id, |
| 96 | + name=your_bq_routine_id, |
| 97 | + packages=["cryptography"], |
| 98 | + ) |
| 99 | + def get_hash(input: str) -> str: |
| 100 | + from cryptography.fernet import Fernet |
| 101 | + |
| 102 | + # handle missing value |
| 103 | + if input is None: |
| 104 | + input = "" |
| 105 | + |
| 106 | + key = Fernet.generate_key() |
| 107 | + f = Fernet(key) |
| 108 | + return f.encrypt(input.encode()).decode() |
| 109 | + |
| 110 | + # We can use this udf in another `pandas`-like API `map` that |
| 111 | + # can be applied on a DataFrame |
| 112 | + df_redacted = df[["species", "island", "sex"]].map(get_hash) |
| 113 | + df_redacted.peek(10) |
| 114 | + |
| 115 | + # [END bigquery_dataframes_udf] |
| 116 | + |
| 117 | + # Clean up cloud artifacts |
| 118 | + session = bpd.get_global_session() |
| 119 | + session.bqclient.delete_routine( |
| 120 | + f"{your_bq_dataset_id}.{your_bq_routine_id}", not_found_ok=True |
| 121 | + ) |
0 commit comments