diff --git a/.kokoro/continuous/e2e.cfg b/.kokoro/continuous/e2e.cfg index 774b63313e..3dbd0b47f0 100644 --- a/.kokoro/continuous/e2e.cfg +++ b/.kokoro/continuous/e2e.cfg @@ -3,7 +3,7 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "unit_prerelease system_prerelease system_noextras e2e notebook" + value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras" } env_vars: { diff --git a/.kokoro/presubmit/e2e.cfg b/.kokoro/presubmit/e2e.cfg index 774b63313e..3dbd0b47f0 100644 --- a/.kokoro/presubmit/e2e.cfg +++ b/.kokoro/presubmit/e2e.cfg @@ -3,7 +3,7 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "unit_prerelease system_prerelease system_noextras e2e notebook" + value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras" } env_vars: { diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 877e4a9fa1..4afa6037de 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -99,9 +99,12 @@ def add_api_method(api_method_name): _api_methods = _api_methods[:MAX_LABELS_COUNT] -def get_and_reset_api_methods(): +def get_and_reset_api_methods(dry_run: bool = False): global _lock with _lock: previous_api_methods = list(_api_methods) - _api_methods.clear() + + # dry_run might not make a job resource, so only reset the log on real queries. + if not dry_run: + _api_methods.clear() return previous_api_methods diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 602865abd9..12c96b90f0 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2912,7 +2912,9 @@ def to_csv( field_delimiter=sep, header=header, ) - _, query_job = self._block.expr.session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query( + export_data_statement, api_name="dataframe-to_csv" + ) self._set_internal_query_job(query_job) def to_json( @@ -2954,7 +2956,9 @@ def to_json( format="JSON", export_options={}, ) - _, query_job = self._block.expr.session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query( + export_data_statement, api_name="dataframe-to_json" + ) self._set_internal_query_job(query_job) def to_gbq( @@ -3086,7 +3090,9 @@ def to_parquet( format="PARQUET", export_options=export_options, ) - _, query_job = self._block.expr.session._start_query(export_data_statement) + _, query_job = self._block.expr.session._start_query( + export_data_statement, api_name="dataframe-to_parquet" + ) self._set_internal_query_job(query_job) def to_dict( diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9eff802cc7..0ae90a28d3 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,7 +24,7 @@ import sys import tempfile import textwrap -from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union +from typing import cast, List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union import warnings import ibis @@ -133,6 +133,8 @@ def __init__( cloud_function_service_account, cloud_function_kms_key_name, cloud_function_docker_repository, + *, + session: Session, ): self._gcp_project_id = gcp_project_id self._cloud_function_region = cloud_function_region @@ -145,6 +147,7 @@ def __init__( self._cloud_function_service_account = cloud_function_service_account self._cloud_function_kms_key_name = cloud_function_kms_key_name self._cloud_function_docker_repository = cloud_function_docker_repository + self._session = session def create_bq_remote_function( self, @@ -216,10 +219,8 @@ def create_bq_remote_function( # This requires bigquery.datasets.create IAM permission self._bq_client.create_dataset(dataset, exists_ok=True) - # TODO: Use session._start_query() so we get progress bar - query_job = self._bq_client.query(create_function_ddl) # Make an API request. - query_job.result() # Wait for the job to complete. - + # TODO(swast): plumb through the original, user-facing api_name. + _, query_job = self._session._start_query(create_function_ddl) logger.info(f"Created remote function {query_job.ddl_target_routine}") def get_cloud_function_fully_qualified_parent(self): @@ -910,6 +911,7 @@ def remote_function( is_row_processor = False import bigframes.series + import bigframes.session if input_types == bigframes.series.Series: warnings.warn( @@ -928,7 +930,7 @@ def remote_function( # Some defaults may be used from the session if not provided otherwise import bigframes.pandas as bpd - session = session or bpd.get_global_session() + session = cast(bigframes.session.Session, session or bpd.get_global_session()) # A BigQuery client is required to perform BQ operations if not bigquery_client: @@ -1040,6 +1042,7 @@ def wrapper(f): cloud_function_service_account, cloud_function_kms_key_name, cloud_function_docker_repository, + session=session, # type: ignore ) rf_name, cf_name = remote_function_client.provision_bq_remote_function( diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 3628ecf67b..dc9b847a9b 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -399,6 +399,9 @@ def _set_default_session_location_if_possible(query): bqclient = clients_provider.bqclient if bigframes.session._io.bigquery.is_query(query): + # Intentionally run outside of the session so that we can detect the + # location before creating the session. Since it's a dry_run, labels + # aren't necessary. job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True)) options.bigquery.location = job.location else: @@ -773,7 +776,10 @@ def clean_up_by_session_id( dataset = session._anonymous_dataset else: dataset = bigframes.session._io.bigquery.create_bq_dataset_reference( - client, location=location, project=project + client, + location=location, + project=project, + api_name="clean_up_by_session_id", ) bigframes.session._io.bigquery.delete_tables_matching_session_id( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 45c1d15c5a..83481a3ae9 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -235,7 +235,9 @@ def __init__( self._anonymous_dataset = ( bigframes.session._io.bigquery.create_bq_dataset_reference( - self.bqclient, location=self._location + self.bqclient, + location=self._location, + api_name="session-__init__", ) ) @@ -420,9 +422,11 @@ def _query_to_destination( # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. dry_run_config = bigquery.QueryJobConfig() dry_run_config.dry_run = True - _, dry_run_job = self._start_query(query, job_config=dry_run_config) + _, dry_run_job = self._start_query( + query, job_config=dry_run_config, api_name=api_name + ) if dry_run_job.statement_type != "SELECT": - _, query_job = self._start_query(query) + _, query_job = self._start_query(query, api_name=api_name) return query_job.destination, query_job # Create a table to workaround BigQuery 10 GB query results limit. See: @@ -451,7 +455,6 @@ def _query_to_destination( bigquery.QueryJobConfig, bigquery.QueryJobConfig.from_api_repr(configuration), ) - job_config.labels["bigframes-api"] = api_name job_config.destination = temp_table try: @@ -459,7 +462,10 @@ def _query_to_destination( # limit. See: internal issue 303057336. job_config.labels["error_caught"] = "true" _, query_job = self._start_query( - query, job_config=job_config, timeout=timeout + query, + job_config=job_config, + timeout=timeout, + api_name=api_name, ) return query_job.destination, query_job except google.api_core.exceptions.BadRequest: @@ -467,7 +473,7 @@ def _query_to_destination( # tables as the destination. For example, if the query has a # top-level ORDER BY, this conflicts with our ability to cluster # the table by the index column(s). - _, query_job = self._start_query(query, timeout=timeout) + _, query_job = self._start_query(query, timeout=timeout, api_name=api_name) return query_job.destination, query_job def read_gbq_query( @@ -811,7 +817,7 @@ def _read_gbq_table( dry_run_config = bigquery.QueryJobConfig() dry_run_config.dry_run = True try: - self._start_query(sql, job_config=dry_run_config) + self._start_query(sql, job_config=dry_run_config, api_name=api_name) except google.api_core.exceptions.NotFound: # note that a notfound caused by a simple typo will be # caught above when the metadata is fetched, not here @@ -1777,12 +1783,6 @@ def _prepare_query_job_config( bigframes.options.compute.maximum_bytes_billed ) - current_labels = job_config.labels if job_config.labels else {} - for key, value in bigframes.options.compute.extra_query_labels.items(): - if key not in current_labels: - current_labels[key] = value - job_config.labels = current_labels - if self._bq_kms_key_name: job_config.destination_encryption_configuration = ( bigquery.EncryptionConfiguration(kms_key_name=self._bq_kms_key_name) @@ -1818,13 +1818,19 @@ def _start_query( job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, timeout: Optional[float] = None, + api_name: Optional[str] = None, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts BigQuery query job and waits for results. """ job_config = self._prepare_query_job_config(job_config) return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, sql, job_config, max_results, timeout + self.bqclient, + sql, + job_config, + max_results, + timeout, + api_name=api_name, ) def _start_query_ml_ddl( @@ -1970,6 +1976,9 @@ def _execute( job_config = bigquery.QueryJobConfig(dry_run=dry_run) else: job_config.dry_run = dry_run + + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. return self._start_query( sql=sql, job_config=job_config, @@ -1982,6 +1991,9 @@ def _peek( if not tree_properties.peekable(self._with_cached_executions(array_value.node)): warnings.warn("Peeking this value cannot be done efficiently.") sql = self._compile_unordered(array_value).peek_sql(n_rows) + + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. return self._start_query( sql=sql, ) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 28eed47965..cd6847c312 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -28,6 +28,7 @@ import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions import google.cloud.bigquery as bigquery +import google.cloud.bigquery.table import bigframes from bigframes.core import log_adapter @@ -40,19 +41,34 @@ # will be limited to this many tables LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" +CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." def create_job_configs_labels( job_configs_labels: Optional[Dict[str, str]], api_methods: typing.List[str], + api_name: Optional[str] = None, ) -> Dict[str, str]: if job_configs_labels is None: job_configs_labels = {} - if api_methods: + # If the user has labels they wish to set, make sure we set those first so + # they are preserved. + for key, value in bigframes.options.compute.extra_query_labels.items(): + job_configs_labels[key] = value + + if api_name is not None: + job_configs_labels["bigframes-api"] = api_name + + if api_methods and "bigframes-api" not in job_configs_labels: job_configs_labels["bigframes-api"] = api_methods[0] del api_methods[0] + # Make sure we always populate bigframes-api with _something_, even if we + # have a code path which doesn't populate the list of api_methods. See + # internal issue 336521938. + job_configs_labels.setdefault("bigframes-api", "unknown") + labels = list( itertools.chain( job_configs_labels.keys(), @@ -193,27 +209,33 @@ def format_option(key: str, value: Union[bool, str]) -> str: return f"{key}={repr(value)}" +def add_labels(job_config, api_name: Optional[str] = None): + api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) + job_config.labels = create_job_configs_labels( + job_configs_labels=job_config.labels, + api_methods=api_methods, + api_name=api_name, + ) + + def start_query_with_client( bq_client: bigquery.Client, sql: str, job_config: bigquery.job.QueryJobConfig, max_results: Optional[int] = None, timeout: Optional[float] = None, + api_name: Optional[str] = None, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results. """ - if not job_config.dry_run: - api_methods = log_adapter.get_and_reset_api_methods() - job_config.labels = create_job_configs_labels( - job_configs_labels=job_config.labels, api_methods=api_methods - ) + add_labels(job_config, api_name=api_name) try: query_job = bq_client.query(sql, job_config=job_config, timeout=timeout) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: - ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." + ex.message += CHECK_DRIVE_PERMISSIONS raise opts = bigframes.options.display @@ -286,7 +308,10 @@ def delete_tables_matching_session_id( def create_bq_dataset_reference( - bq_client: bigquery.Client, location=None, project=None + bq_client: bigquery.Client, + location=None, + project=None, + api_name: str = "unknown", ) -> bigquery.DatasetReference: """Create and identify dataset(s) for temporary BQ resources. @@ -307,7 +332,11 @@ def create_bq_dataset_reference( Returns: bigquery.DatasetReference: The constructed reference to the anonymous dataset. """ - query_job = bq_client.query("SELECT 1", location=location, project=project) + job_config = google.cloud.bigquery.QueryJobConfig() + add_labels(job_config, api_name=api_name) + query_job = bq_client.query( + "SELECT 1", location=location, project=project, job_config=job_config + ) query_job.result() # blocks until finished # The anonymous dataset is used by BigQuery to write query results and diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 063dde2a24..e00892fce9 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -113,8 +113,10 @@ def get_table_metadata( # atomically. table = bqclient.get_table(table_ref) + # TODO(swast): Use session._start_query instead? + # TODO(swast): Use query_and_wait since we know these are small results. job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name + bigframes.session._io.bigquery.add_labels(job_config, api_name=api_name) snapshot_timestamp = list( bqclient.query( "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", @@ -285,57 +287,6 @@ def get_index_cols( return index_cols -def get_time_travel_datetime_and_table_metadata( - bqclient: bigquery.Client, - table_ref: bigquery.TableReference, - *, - api_name: str, - cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], - use_cache: bool = True, -) -> Tuple[datetime.datetime, bigquery.Table]: - cached_table = cache.get(table_ref) - if use_cache and cached_table is not None: - snapshot_timestamp, _ = cached_table - - # Cache hit could be unexpected. See internal issue 329545805. - # Raise a warning with more information about how to avoid the - # problems with the cache. - warnings.warn( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - stacklevel=7, - ) - return cached_table - - # TODO(swast): It's possible that the table metadata is changed between now - # and when we run the CURRENT_TIMESTAMP() query to see when we can time - # travel to. Find a way to fetch the table metadata and BQ's current time - # atomically. - table = bqclient.get_table(table_ref) - - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - snapshot_timestamp = list( - bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - cached_table = (snapshot_timestamp, table) - cache[table_ref] = cached_table - return cached_table - - def to_array_value_with_total_ordering( session: bigframes.session.Session, table_expression: ibis_types.Table, diff --git a/noxfile.py b/noxfile.py index c816ec5f51..c6e8da8c81 100644 --- a/noxfile.py +++ b/noxfile.py @@ -91,7 +91,6 @@ "unit", "unit_noextras", "system", - "doctest", "cover", ] diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index c617eab7f5..9631e0c7ab 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -545,18 +545,22 @@ def test_read_gbq_with_configuration( def test_read_gbq_with_custom_global_labels( session: bigframes.Session, scalars_table_id: str ): - bigframes.options.compute.assign_extra_query_labels(test1=1, test2="abc") - bigframes.options.compute.extra_query_labels["test3"] = False + # Ensure we use thread-local variables to avoid conflicts with parallel tests. + with bigframes.option_context("compute.extra_query_labels", {}): + bigframes.options.compute.assign_extra_query_labels(test1=1, test2="abc") + bigframes.options.compute.extra_query_labels["test3"] = False - job_labels = session.read_gbq(scalars_table_id).query_job.labels # type:ignore - expected_labels = {"test1": "1", "test2": "abc", "test3": "false"} + job_labels = session.read_gbq(scalars_table_id).query_job.labels # type:ignore + expected_labels = {"test1": "1", "test2": "abc", "test3": "false"} - assert all(job_labels.get(key) == value for key, value in expected_labels.items()) + # All jobs should include a bigframes-api key. See internal issue 336521938. + assert "bigframes-api" in job_labels - del bigframes.options.compute.extra_query_labels["test1"] - del bigframes.options.compute.extra_query_labels["test2"] - del bigframes.options.compute.extra_query_labels["test3"] + assert all( + job_labels.get(key) == value for key, value in expected_labels.items() + ) + # No labels outside of the option_context. assert len(bigframes.options.compute.extra_query_labels) == 0 diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 8ba13a7276..d687643c8a 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -36,6 +36,25 @@ def test_create_job_configs_labels_is_none(): assert labels == expected_dict +def test_create_job_configs_labels_always_includes_bigframes_api(): + labels = io_bq.create_job_configs_labels(None, []) + assert labels == { + "bigframes-api": "unknown", + } + + +def test_create_job_configs_labels_includes_extra_query_labels(): + user_labels = {"my-label-1": "my-value-1", "my-label-2": "my-value-2"} + + with bigframes.option_context("compute.extra_query_labels", user_labels): + labels = io_bq.create_job_configs_labels(None, []) + assert labels == { + "my-label-1": "my-value-1", + "my-label-2": "my-value-2", + "bigframes-api": "unknown", + } + + def test_create_job_configs_labels_length_limit_not_met(): cur_labels = { "source": "bigquery-dataframes-temp", @@ -105,7 +124,7 @@ def test_create_job_configs_labels_length_limit_met(): "bigframes-api": "read_pandas", "source": "bigquery-dataframes-temp", } - for i in range(100): + for i in range(61): key = f"bigframes-api-test-{i}" value = f"test{i}" cur_labels[key] = value