Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,25 @@

import pytest

from google.cloud import bigquery
import test_utils.prefixer

from . import helpers

prefixer = test_utils.prefixer.Prefixer("python-bigquery", "tests/system")


@pytest.fixture(scope="session", autouse=True)
def cleanup_datasets(bigquery_client: bigquery.Client):
for dataset in bigquery_client.list_datasets():
if prefixer.should_cleanup(dataset.dataset_id):
bigquery_client.delete_dataset(
dataset, delete_contents=True, not_found_ok=True
)


@pytest.fixture(scope="session")
def bigquery_client():
from google.cloud import bigquery

return bigquery.Client()


Expand All @@ -33,10 +45,10 @@ def bqstorage_client(bigquery_client):

@pytest.fixture(scope="session")
def dataset_id(bigquery_client):
dataset_id = f"bqsystem_{helpers.temp_suffix()}"
dataset_id = prefixer.create_prefix()
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)


@pytest.fixture
Expand Down
222 changes: 106 additions & 116 deletions tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class Config(object):

CLIENT: Optional[bigquery.Client] = None
CURSOR = None
DATASET = None


def setUpModule():
Expand All @@ -164,9 +163,7 @@ def setUpModule():

class TestBigQuery(unittest.TestCase):
def setUp(self):
Config.DATASET = _make_dataset_id("bq_system_tests")
dataset = Config.CLIENT.create_dataset(Config.DATASET)
self.to_delete = [dataset]
self.to_delete = []

def tearDown(self):
policy_tag_client = PolicyTagManagerClient()
Expand Down Expand Up @@ -1622,20 +1619,6 @@ def test_dbapi_fetchall_from_script(self):
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, [(5, "foo"), (6, "bar"), (7, "baz")])

def test_dbapi_create_view(self):

query = """
CREATE VIEW {}.dbapi_create_view
AS SELECT name, SUM(number) AS total
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name;
""".format(
Config.DATASET
)

Config.CURSOR.execute(query)
self.assertEqual(Config.CURSOR.rowcount, 0, "expected 0 rows")

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
Expand Down Expand Up @@ -2476,104 +2459,6 @@ def test_list_rows_page_size(self):
page = next(pages)
self.assertEqual(page.num_items, num_last_page)

def test_parameterized_types_round_trip(self):
client = Config.CLIENT
table_id = f"{Config.DATASET}.test_parameterized_types_round_trip"
fields = (
("n", "NUMERIC"),
("n9", "NUMERIC(9)"),
("n92", "NUMERIC(9, 2)"),
("bn", "BIGNUMERIC"),
("bn9", "BIGNUMERIC(38)"),
("bn92", "BIGNUMERIC(38, 22)"),
("s", "STRING"),
("s9", "STRING(9)"),
("b", "BYTES"),
("b9", "BYTES(9)"),
)
self.to_delete.insert(0, Table(f"{client.project}.{table_id}"))
client.query(
"create table {} ({})".format(
table_id, ", ".join(" ".join(f) for f in fields)
)
).result()
table = client.get_table(table_id)
table_id2 = table_id + "2"
self.to_delete.insert(0, Table(f"{client.project}.{table_id2}"))
client.create_table(Table(f"{client.project}.{table_id2}", table.schema))
table2 = client.get_table(table_id2)

self.assertEqual(tuple(s._key()[:2] for s in table2.schema), fields)

def test_table_snapshots(self):
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery import OperationType

client = Config.CLIENT

source_table_path = f"{client.project}.{Config.DATASET}.test_table"
snapshot_table_path = f"{source_table_path}_snapshot"

# Create the table before loading so that the column order is predictable.
schema = [
bigquery.SchemaField("foo", "INTEGER"),
bigquery.SchemaField("bar", "STRING"),
]
source_table = helpers.retry_403(Config.CLIENT.create_table)(
Table(source_table_path, schema=schema)
)
self.to_delete.insert(0, source_table)

# Populate the table with initial data.
rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}]
load_job = Config.CLIENT.load_table_from_json(rows, source_table)
load_job.result()

# Now create a snapshot before modifying the original table data.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.SNAPSHOT

copy_job = client.copy_table(
sources=source_table_path,
destination=snapshot_table_path,
job_config=copy_config,
)
copy_job.result()

snapshot_table = client.get_table(snapshot_table_path)
self.to_delete.insert(0, snapshot_table)

# Modify data in original table.
sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")'
query_job = client.query(sql)
query_job.result()

# List rows from the source table and compare them to rows from the snapshot.
rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two"), (3, "three")]

rows_iter = client.list_rows(snapshot_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

# Now restore the table from the snapshot and it should again contain the old
# set of rows.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.RESTORE
copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

copy_job = client.copy_table(
sources=snapshot_table_path,
destination=source_table_path,
job_config=copy_config,
)
copy_job.result()

rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

def temp_dataset(self, dataset_id, location=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses the shared dataset_id fixture, but wants to create the dataset itself -- should it be updated to use a different one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Are you referring to test_table_snapshots? It was using the dataset created in setUp. I don't see it creating a dataset. Am I blind? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was referring to the temp_dataset helper (my eye read that as test_dataset). It is used by test_create_dataset, test_update_dataset, etc. Because it isn't a test, it isn't using the dataset_id fixture, but an actual passed-in dataset_id parameter.

I don't know whether all those tests actually need to be using separate datasets, but could imagine that at least some could share.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A number of the other tests could and should be refactored to use the shared dataset. In the interest of not making this PR too complex, I opted to save updating the other tests for a future PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does the minimum to get rid of the mostly-unused dataset creation in setUp.

project = Config.CLIENT.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
Expand Down Expand Up @@ -2604,3 +2489,108 @@ def _table_exists(t):
return True
except NotFound:
return False


def test_dbapi_create_view(dataset_id):

query = f"""
CREATE VIEW {dataset_id}.dbapi_create_view
AS SELECT name, SUM(number) AS total
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name;
"""

Config.CURSOR.execute(query)
assert Config.CURSOR.rowcount == 0, "expected 0 rows"


def test_parameterized_types_round_trip(dataset_id):
client = Config.CLIENT
table_id = f"{dataset_id}.test_parameterized_types_round_trip"
fields = (
("n", "NUMERIC"),
("n9", "NUMERIC(9)"),
("n92", "NUMERIC(9, 2)"),
("bn", "BIGNUMERIC"),
("bn9", "BIGNUMERIC(38)"),
("bn92", "BIGNUMERIC(38, 22)"),
("s", "STRING"),
("s9", "STRING(9)"),
("b", "BYTES"),
("b9", "BYTES(9)"),
)
client.query(
"create table {} ({})".format(table_id, ", ".join(" ".join(f) for f in fields))
).result()
table = client.get_table(table_id)
table_id2 = table_id + "2"
client.create_table(Table(f"{client.project}.{table_id2}", table.schema))
table2 = client.get_table(table_id2)

assert tuple(s._key()[:2] for s in table2.schema) == fields


def test_table_snapshots(dataset_id):
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery import OperationType

client = Config.CLIENT

source_table_path = f"{client.project}.{dataset_id}.test_table"
snapshot_table_path = f"{source_table_path}_snapshot"

# Create the table before loading so that the column order is predictable.
schema = [
bigquery.SchemaField("foo", "INTEGER"),
bigquery.SchemaField("bar", "STRING"),
]
source_table = helpers.retry_403(Config.CLIENT.create_table)(
Table(source_table_path, schema=schema)
)

# Populate the table with initial data.
rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}]
load_job = Config.CLIENT.load_table_from_json(rows, source_table)
load_job.result()

# Now create a snapshot before modifying the original table data.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.SNAPSHOT

copy_job = client.copy_table(
sources=source_table_path,
destination=snapshot_table_path,
job_config=copy_config,
)
copy_job.result()

# Modify data in original table.
sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")'
query_job = client.query(sql)
query_job.result()

# List rows from the source table and compare them to rows from the snapshot.
rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two"), (3, "three")]

rows_iter = client.list_rows(snapshot_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

# Now restore the table from the snapshot and it should again contain the old
# set of rows.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.RESTORE
copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

copy_job = client.copy_table(
sources=snapshot_table_path,
destination=source_table_path,
job_config=copy_config,
)
copy_job.result()

rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]