From 72e02c8b019409f1baa61a75dc4d5202a6e6d4b9 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 6 May 2025 19:05:21 +0000 Subject: [PATCH 1/3] deps: move bigtable and pubsub to extras --- setup.py | 9 +- tests/system/large/streaming/test_bigtable.py | 104 ++++++++++++++++++ .../test_pubsub.py} | 87 +-------------- 3 files changed, 115 insertions(+), 85 deletions(-) create mode 100644 tests/system/large/streaming/test_bigtable.py rename tests/system/large/{test_streaming.py => streaming/test_pubsub.py} (57%) diff --git a/setup.py b/setup.py index 489d9aacd9..edd8e63e65 100644 --- a/setup.py +++ b/setup.py @@ -39,8 +39,6 @@ "gcsfs >=2023.3.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0", - "google-cloud-bigtable >=2.24.0", - "google-cloud-pubsub >=2.21.4", "google-cloud-bigquery[bqstorage,pandas] >=3.31.0", # 2.30 needed for arrow support. "google-cloud-bigquery-storage >= 2.30.0, < 3.0.0", @@ -72,7 +70,12 @@ ] extras = { # Optional test dependencies packages. If they're missed, may skip some tests. - "tests": ["freezegun", "pytest-snapshot"], + "tests": [ + "freezegun", + "pytest-snapshot", + "google-cloud-bigtable >=2.24.0", + "google-cloud-pubsub >=2.21.4", + ], # used for local engine, which is only needed for unit tests at present. "polars": ["polars >= 1.7.0"], "scikit-learn": ["scikit-learn>=1.2.2"], diff --git a/tests/system/large/streaming/test_bigtable.py b/tests/system/large/streaming/test_bigtable.py new file mode 100644 index 0000000000..4d07cbd27a --- /dev/null +++ b/tests/system/large/streaming/test_bigtable.py @@ -0,0 +1,104 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from typing import Generator +import uuid + +import pytest + +import bigframes + +pytest.importorskip("google.cloud.bigtable") + +from google.cloud import bigtable # noqa +from google.cloud.bigtable import column_family, instance, table # noqa + + +@pytest.fixture(scope="session") +def bigtable_instance(session_load: bigframes.Session) -> instance.Instance: + client = bigtable.Client(project=session_load._project, admin=True) + + instance_name = "streaming-testing-instance" + bt_instance = instance.Instance( + instance_name, + client, + ) + + if not bt_instance.exists(): + cluster_id = "streaming-testing-instance-c1" + cluster = bt_instance.cluster( + cluster_id, + location_id="us-west1-a", + serve_nodes=1, + ) + operation = bt_instance.create( + clusters=[cluster], + ) + operation.result(timeout=480) + return bt_instance + + +@pytest.fixture(scope="function") +def bigtable_table( + bigtable_instance: instance.Instance, +) -> Generator[table.Table, None, None]: + table_id = "bigframes_test_" + uuid.uuid4().hex + bt_table = table.Table( + table_id, + bigtable_instance, + ) + max_versions_rule = column_family.MaxVersionsGCRule(1) + column_family_id = "body_mass_g" + column_families = {column_family_id: max_versions_rule} + bt_table.create(column_families=column_families) + yield bt_table + bt_table.delete() + + +@pytest.mark.flaky(retries=3, delay=10) +def test_streaming_df_to_bigtable( + session_load: bigframes.Session, bigtable_table: table.Table +): + # launch a continuous query + job_id_prefix = "test_streaming_" + sdf = session_load.read_gbq_table_streaming("birds.penguins_bigtable_streaming") + + sdf = sdf[["species", "island", "body_mass_g"]] + sdf = sdf[sdf["body_mass_g"] < 4000] + sdf = sdf.rename(columns={"island": "rowkey"}) + + try: + query_job = sdf.to_bigtable( + instance="streaming-testing-instance", + table=bigtable_table.table_id, + service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", + app_profile=None, + truncate=True, + overwrite=True, + auto_create_column_families=True, + bigtable_options={}, + job_id=None, + job_id_prefix=job_id_prefix, + ) + + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.running() + assert query_job.error_result is None + assert str(query_job.job_id).startswith(job_id_prefix) + assert len(list(bigtable_table.read_rows())) > 0 + finally: + query_job.cancel() diff --git a/tests/system/large/test_streaming.py b/tests/system/large/streaming/test_pubsub.py similarity index 57% rename from tests/system/large/test_streaming.py rename to tests/system/large/streaming/test_pubsub.py index f80088cf69..8ad00c97c2 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/streaming/test_pubsub.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,62 +13,22 @@ # limitations under the License. from concurrent import futures -import time from typing import Generator import uuid -from google.cloud import bigtable, pubsub # type: ignore -from google.cloud.bigtable import column_family, instance, table import pytest import bigframes +pytest.importorskip("google.cloud.pubsub") +from google.cloud import pubsub # noqa + def resource_name_full(project_id: str, resource_type: str, resource_id: str): + """Used for bigtable or pubsub resources.""" return f"projects/{project_id}/{resource_type}/{resource_id}" -@pytest.fixture(scope="session") -def bigtable_instance(session_load: bigframes.Session) -> instance.Instance: - client = bigtable.Client(project=session_load._project, admin=True) - - instance_name = "streaming-testing-instance" - bt_instance = instance.Instance( - instance_name, - client, - ) - - if not bt_instance.exists(): - cluster_id = "streaming-testing-instance-c1" - cluster = bt_instance.cluster( - cluster_id, - location_id="us-west1-a", - serve_nodes=1, - ) - operation = bt_instance.create( - clusters=[cluster], - ) - operation.result(timeout=480) - return bt_instance - - -@pytest.fixture(scope="function") -def bigtable_table( - bigtable_instance: instance.Instance, -) -> Generator[table.Table, None, None]: - table_id = "bigframes_test_" + uuid.uuid4().hex - bt_table = table.Table( - table_id, - bigtable_instance, - ) - max_versions_rule = column_family.MaxVersionsGCRule(1) - column_family_id = "body_mass_g" - column_families = {column_family_id: max_versions_rule} - bt_table.create(column_families=column_families) - yield bt_table - bt_table.delete() - - @pytest.fixture(scope="function") def pubsub_topic_id(session_load: bigframes.Session) -> Generator[str, None, None]: publisher = pubsub.PublisherClient() @@ -98,43 +58,6 @@ def pubsub_topic_subscription_ids( subscriber.delete_subscription(subscription=subscription_name) -@pytest.mark.flaky(retries=3, delay=10) -def test_streaming_df_to_bigtable( - session_load: bigframes.Session, bigtable_table: table.Table -): - # launch a continuous query - job_id_prefix = "test_streaming_" - sdf = session_load.read_gbq_table_streaming("birds.penguins_bigtable_streaming") - - sdf = sdf[["species", "island", "body_mass_g"]] - sdf = sdf[sdf["body_mass_g"] < 4000] - sdf = sdf.rename(columns={"island": "rowkey"}) - - try: - query_job = sdf.to_bigtable( - instance="streaming-testing-instance", - table=bigtable_table.table_id, - service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", - app_profile=None, - truncate=True, - overwrite=True, - auto_create_column_families=True, - bigtable_options={}, - job_id=None, - job_id_prefix=job_id_prefix, - ) - - # wait 100 seconds in order to ensure the query doesn't stop - # (i.e. it is continuous) - time.sleep(100) - assert query_job.running() - assert query_job.error_result is None - assert str(query_job.job_id).startswith(job_id_prefix) - assert len(list(bigtable_table.read_rows())) > 0 - finally: - query_job.cancel() - - @pytest.mark.flaky(retries=3, delay=10) def test_streaming_df_to_pubsub( session_load: bigframes.Session, pubsub_topic_subscription_ids: tuple[str, str] From afeac430a84c8b417cb0c910cb8822614bad5319 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 6 May 2025 19:10:18 +0000 Subject: [PATCH 2/3] 2025 --- tests/system/large/streaming/test_bigtable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/large/streaming/test_bigtable.py b/tests/system/large/streaming/test_bigtable.py index 4d07cbd27a..e57b7e6e0e 100644 --- a/tests/system/large/streaming/test_bigtable.py +++ b/tests/system/large/streaming/test_bigtable.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From ab0d2bef3ad28c26586fff3cd096e9cbe9770382 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Tue, 6 May 2025 20:28:24 +0000 Subject: [PATCH 3/3] fix mypy --- tests/system/large/streaming/test_pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/large/streaming/test_pubsub.py b/tests/system/large/streaming/test_pubsub.py index 8ad00c97c2..277b44c93b 100644 --- a/tests/system/large/streaming/test_pubsub.py +++ b/tests/system/large/streaming/test_pubsub.py @@ -21,7 +21,7 @@ import bigframes pytest.importorskip("google.cloud.pubsub") -from google.cloud import pubsub # noqa +from google.cloud import pubsub # type: ignore # noqa def resource_name_full(project_id: str, resource_type: str, resource_id: str):