|
1 |
| -# Copyright 2024 Google LLC |
| 1 | +# Copyright 2025 Google LLC |
2 | 2 | #
|
3 | 3 | # Licensed under the Apache License, Version 2.0 (the "License");
|
4 | 4 | # you may not use this file except in compliance with the License.
|
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 | 15 | from concurrent import futures
|
16 |
| -import time |
17 | 16 | from typing import Generator
|
18 | 17 | import uuid
|
19 | 18 |
|
20 |
| -from google.cloud import bigtable, pubsub # type: ignore |
21 |
| -from google.cloud.bigtable import column_family, instance, table |
22 | 19 | import pytest
|
23 | 20 |
|
24 | 21 | import bigframes
|
25 | 22 |
|
| 23 | +pytest.importorskip("google.cloud.pubsub") |
| 24 | +from google.cloud import pubsub # type: ignore # noqa |
| 25 | + |
26 | 26 |
|
27 | 27 | def resource_name_full(project_id: str, resource_type: str, resource_id: str):
|
| 28 | + """Used for bigtable or pubsub resources.""" |
28 | 29 | return f"projects/{project_id}/{resource_type}/{resource_id}"
|
29 | 30 |
|
30 | 31 |
|
31 |
| -@pytest.fixture(scope="session") |
32 |
| -def bigtable_instance(session_load: bigframes.Session) -> instance.Instance: |
33 |
| - client = bigtable.Client(project=session_load._project, admin=True) |
34 |
| - |
35 |
| - instance_name = "streaming-testing-instance" |
36 |
| - bt_instance = instance.Instance( |
37 |
| - instance_name, |
38 |
| - client, |
39 |
| - ) |
40 |
| - |
41 |
| - if not bt_instance.exists(): |
42 |
| - cluster_id = "streaming-testing-instance-c1" |
43 |
| - cluster = bt_instance.cluster( |
44 |
| - cluster_id, |
45 |
| - location_id="us-west1-a", |
46 |
| - serve_nodes=1, |
47 |
| - ) |
48 |
| - operation = bt_instance.create( |
49 |
| - clusters=[cluster], |
50 |
| - ) |
51 |
| - operation.result(timeout=480) |
52 |
| - return bt_instance |
53 |
| - |
54 |
| - |
55 |
| -@pytest.fixture(scope="function") |
56 |
| -def bigtable_table( |
57 |
| - bigtable_instance: instance.Instance, |
58 |
| -) -> Generator[table.Table, None, None]: |
59 |
| - table_id = "bigframes_test_" + uuid.uuid4().hex |
60 |
| - bt_table = table.Table( |
61 |
| - table_id, |
62 |
| - bigtable_instance, |
63 |
| - ) |
64 |
| - max_versions_rule = column_family.MaxVersionsGCRule(1) |
65 |
| - column_family_id = "body_mass_g" |
66 |
| - column_families = {column_family_id: max_versions_rule} |
67 |
| - bt_table.create(column_families=column_families) |
68 |
| - yield bt_table |
69 |
| - bt_table.delete() |
70 |
| - |
71 |
| - |
72 | 32 | @pytest.fixture(scope="function")
|
73 | 33 | def pubsub_topic_id(session_load: bigframes.Session) -> Generator[str, None, None]:
|
74 | 34 | publisher = pubsub.PublisherClient()
|
@@ -98,43 +58,6 @@ def pubsub_topic_subscription_ids(
|
98 | 58 | subscriber.delete_subscription(subscription=subscription_name)
|
99 | 59 |
|
100 | 60 |
|
101 |
| -@pytest.mark.flaky(retries=3, delay=10) |
102 |
| -def test_streaming_df_to_bigtable( |
103 |
| - session_load: bigframes.Session, bigtable_table: table.Table |
104 |
| -): |
105 |
| - # launch a continuous query |
106 |
| - job_id_prefix = "test_streaming_" |
107 |
| - sdf = session_load.read_gbq_table_streaming("birds.penguins_bigtable_streaming") |
108 |
| - |
109 |
| - sdf = sdf[["species", "island", "body_mass_g"]] |
110 |
| - sdf = sdf[sdf["body_mass_g"] < 4000] |
111 |
| - sdf = sdf.rename(columns={"island": "rowkey"}) |
112 |
| - |
113 |
| - try: |
114 |
| - query_job = sdf.to_bigtable( |
115 |
| - instance="streaming-testing-instance", |
116 |
| - table=bigtable_table.table_id, |
117 |
| - service_account_email="streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com", |
118 |
| - app_profile=None, |
119 |
| - truncate=True, |
120 |
| - overwrite=True, |
121 |
| - auto_create_column_families=True, |
122 |
| - bigtable_options={}, |
123 |
| - job_id=None, |
124 |
| - job_id_prefix=job_id_prefix, |
125 |
| - ) |
126 |
| - |
127 |
| - # wait 100 seconds in order to ensure the query doesn't stop |
128 |
| - # (i.e. it is continuous) |
129 |
| - time.sleep(100) |
130 |
| - assert query_job.running() |
131 |
| - assert query_job.error_result is None |
132 |
| - assert str(query_job.job_id).startswith(job_id_prefix) |
133 |
| - assert len(list(bigtable_table.read_rows())) > 0 |
134 |
| - finally: |
135 |
| - query_job.cancel() |
136 |
| - |
137 |
| - |
138 | 61 | @pytest.mark.flaky(retries=3, delay=10)
|
139 | 62 | def test_streaming_df_to_pubsub(
|
140 | 63 | session_load: bigframes.Session, pubsub_topic_subscription_ids: tuple[str, str]
|
|
0 commit comments