diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index ae8dc88d43..883087df07 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -84,7 +84,9 @@ def create_temp_table( ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}" - job = self.bqclient.query(ddl, job_config=job_config) + job = self.bqclient.query( + ddl, job_config=job_config, location=self.location + ) job.result() # return the fully qualified table, so it can be used outside of the session return job.destination @@ -94,7 +96,10 @@ def close(self): self._sessiondaemon.stop() if self._session_id is not None and self.bqclient is not None: - self.bqclient.query_and_wait(f"CALL BQ.ABORT_SESSION('{self._session_id}')") + self.bqclient.query_and_wait( + f"CALL BQ.ABORT_SESSION('{self._session_id}')", + location=self.location, + ) def _get_session_id(self) -> str: if self._session_id: diff --git a/tests/system/large/test_location.py b/tests/system/large/test_location.py index d4428c1f95..3ebe2bb040 100644 --- a/tests/system/large/test_location.py +++ b/tests/system/large/test_location.py @@ -14,8 +14,6 @@ import typing -from google.cloud import bigquery -from google.cloud.bigquery_storage import types as bqstorage_types import pandas import pandas.testing import pytest @@ -41,7 +39,15 @@ def _assert_bq_execution_location( if expected_location is None: expected_location = session._location - assert typing.cast(bigquery.QueryJob, df.query_job).location == expected_location + query_job = df.query_job + assert query_job is not None + assert query_job.location == expected_location + destination = query_job.destination + assert destination is not None + destination_dataset = session.bqclient.get_dataset( + f"{destination.project}.{destination.dataset_id}" + ) + assert destination_dataset.location == expected_location # Ensure operation involving BQ client suceeds result = ( @@ -52,38 +58,28 @@ def _assert_bq_execution_location( .head() ) - assert ( - typing.cast(bigquery.QueryJob, result.query_job).location == expected_location + # Use allow_large_results = True to force a job to be created. + result_pd = result.to_pandas(allow_large_results=True) + + query_job = df.query_job + assert query_job is not None + assert query_job.location == expected_location + destination = query_job.destination + assert destination is not None + destination_dataset = session.bqclient.get_dataset( + f"{destination.project}.{destination.dataset_id}" ) + assert destination_dataset.location == expected_location expected_result = pandas.DataFrame( {"number": [444, 222]}, index=pandas.Index(["aaa", "bbb"], name="name") ) pandas.testing.assert_frame_equal( - expected_result, result.to_pandas(), check_dtype=False, check_index_type=False - ) - - # Ensure BQ Storage Read client operation succceeds - table = result.query_job.destination - requested_session = bqstorage_types.ReadSession( # type: ignore[attr-defined] - table=f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}", - data_format=bqstorage_types.DataFormat.ARROW, # type: ignore[attr-defined] - ) - read_session = session.bqstoragereadclient.create_read_session( - parent=f"projects/{table.project}", - read_session=requested_session, - max_stream_count=1, + expected_result, + result_pd, + check_dtype=False, + check_index_type=False, ) - reader = session.bqstoragereadclient.read_rows(read_session.streams[0].name) - frames = [] - for message in reader.rows().pages: - frames.append(message.to_dataframe()) - read_dataframe = pandas.concat(frames) - # normalize before comparing since we lost some of the bigframes column - # naming abtractions in the direct read of the destination table - read_dataframe = read_dataframe.set_index("name") - read_dataframe.columns = result.columns - pandas.testing.assert_frame_equal(expected_result, read_dataframe) def test_bq_location_default():