Skip to content

tests.system.test_read_gbq_with_bqstorage: test_large_results failed #554

Closed
@flaky-bot

Description

@flaky-bot

This test failed!

To configure my behavior, see the Flaky Bot documentation.

If I'm commenting on this issue too often, add the flakybot: quiet label and
I will stop commenting.


commit: 388196a
buildURL: Build Status, Sponge
status: failed

Test output
self = 
query = '\n        SELECT\n          total_amount,\n          passenger_count,\n          trip_distance\n        FROM `bigquer...\n          AND passenger_count IS NOT NULL\n          AND trip_distance IS NOT NULL\n        LIMIT 10000000\n        '
max_results = None, progress_bar_type = 'tqdm'
kwargs = {'configuration': {'query': {'destinationTable': {'datasetId': 'python_bigquery_pandas_tests_system_20220811105004_6f0...ead_gbq_w_bqstorage_api_ed49ea8a_e04b_487c_a7b2_0575cf51ad99'}, 'writeDisposition': 'WRITE_TRUNCATE'}}, 'dtypes': None}
RefreshError = 
bigquery = 
pandas = 
job_config = {'query': {'destinationTable': {'datasetId': 'python_bigquery_pandas_tests_system_20220811105004_6f0517', 'projectId':...tableId': 'test_read_gbq_w_bqstorage_api_ed49ea8a_e04b_487c_a7b2_0575cf51ad99'}, 'writeDisposition': 'WRITE_TRUNCATE'}}
config = {'query': {'destinationTable': {'datasetId': 'python_bigquery_pandas_tests_system_20220811105004_6f0517', 'projectId':...tableId': 'test_read_gbq_w_bqstorage_api_ed49ea8a_e04b_487c_a7b2_0575cf51ad99'}, 'writeDisposition': 'WRITE_TRUNCATE'}}
query_reply = QueryJob
def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
    from google.auth.exceptions import RefreshError
    from google.cloud import bigquery
    import pandas

    job_config = {
        "query": {
            "useLegacySql": self.dialect
            == "legacy"
            # 'allowLargeResults', 'createDisposition',
            # 'preserveNulls', destinationTable, useQueryCache
        }
    }
    config = kwargs.get("configuration")
    if config is not None:
        job_config.update(config)

    self._start_timer()

    try:
        logger.debug("Requesting query... ")
        query_reply = self.client.query(
            query,
            job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
            location=self.location,
            project=self.project_id,
        )
        logger.debug("Query running...")
    except (RefreshError, ValueError):
        if self.private_key:
            raise AccessDenied("The service account credentials are not valid")
        else:
            raise AccessDenied(
                "The credentials have been revoked or expired, "
                "please re-run the application to re-authorize"
            )
    except self.http_error as ex:
        self.process_http_error(ex)

    job_id = query_reply.job_id
    logger.debug("Job ID: %s" % job_id)

    timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
        "timeoutMs"
    )
    timeout_ms = int(timeout_ms) if timeout_ms else None
    self._wait_for_query_job(query_reply, timeout_ms)

    if query_reply.cache_hit:
        logger.debug("Query done.\nCache hit.\n")
    else:
        bytes_processed = query_reply.total_bytes_processed or 0
        bytes_billed = query_reply.total_bytes_billed or 0
        logger.debug(
            "Query done.\nProcessed: {} Billed: {}".format(
                self.sizeof_fmt(bytes_processed),
                self.sizeof_fmt(bytes_billed),
            )
        )
        logger.debug(
            "Standard price: ${:,.2f} USD\n".format(
                bytes_billed * self.query_price_for_TB
            )
        )

    dtypes = kwargs.get("dtypes")

    # Ensure destination is populated.
    try:
      query_reply.result()

pandas_gbq/gbq.py:517:


self = QueryJob<project=precise-truck-742, location=US, id=03c9dc2f-f605-4af4-971a-da3c2c10915c>
page_size = None, max_results = None
retry = <google.api_core.retry.Retry object at 0x7fcffa574520>, timeout = None
start_index = None
job_retry = <google.api_core.retry.Retry object at 0x7fcffa590bb0>

def result(  # type: ignore  # (complaints about the overloaded signature)
    self,
    page_size: int = None,
    max_results: int = None,
    retry: "retries.Retry" = DEFAULT_RETRY,
    timeout: float = None,
    start_index: int = None,
    job_retry: "retries.Retry" = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
    """Start the job and wait for it to complete and get the result.

    Args:
        page_size (Optional[int]):
            The maximum number of rows in each page of results from this
            request. Non-positive values are ignored.
        max_results (Optional[int]):
            The maximum total number of rows from this request.
        retry (Optional[google.api_core.retry.Retry]):
            How to retry the call that retrieves rows.  This only
            applies to making RPC calls.  It isn't used to retry
            failed jobs.  This has a reasonable default that
            should only be overridden with care. If the job state
            is ``DONE``, retrying is aborted early even if the
            results are not available, as this will not change
            anymore.
        timeout (Optional[float]):
            The number of seconds to wait for the underlying HTTP transport
            before using ``retry``.
            If multiple requests are made under the hood, ``timeout``
            applies to each individual request.
        start_index (Optional[int]):
            The zero-based index of the starting row to read.
        job_retry (Optional[google.api_core.retry.Retry]):
            How to retry failed jobs.  The default retries
            rate-limit-exceeded errors. Passing ``None`` disables
            job retry.

            Not all jobs can be retried.  If ``job_id`` was
            provided to the query that created this job, then the
            job returned by the query will not be retryable, and
            an exception will be raised if non-``None``
            non-default ``job_retry`` is also provided.

    Returns:
        google.cloud.bigquery.table.RowIterator:
            Iterator of row data
            :class:`~google.cloud.bigquery.table.Row`-s. During each
            page, the iterator will have the ``total_rows`` attribute
            set, which counts the total number of rows **in the result
            set** (this is distinct from the total number of rows in the
            current page: ``iterator.page.num_items``).

            If the query is a special query that produces no results, e.g.
            a DDL query, an ``_EmptyRowIterator`` instance is returned.

    Raises:
        google.cloud.exceptions.GoogleAPICallError:
            If the job failed and retries aren't successful.
        concurrent.futures.TimeoutError:
            If the job did not complete in the given timeout.
        TypeError:
            If Non-``None`` and non-default ``job_retry`` is
            provided and the job is not retryable.
    """
    if self.dry_run:
        return _EmptyRowIterator()
    try:
        retry_do_query = getattr(self, "_retry_do_query", None)
        if retry_do_query is not None:
            if job_retry is DEFAULT_JOB_RETRY:
                job_retry = self._job_retry  # type: ignore
        else:
            if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
                raise TypeError(
                    "`job_retry` was provided, but this job is"
                    " not retryable, because a custom `job_id` was"
                    " provided to the query that created this job."
                )

        first = True

        def do_get_result():
            nonlocal first

            if first:
                first = False
            else:
                # Note that we won't get here if retry_do_query is
                # None, because we won't use a retry.

                # The orinal job is failed. Create a new one.
                job = retry_do_query()

                # If it's already failed, we might as well stop:
                if job.done() and job.exception() is not None:
                    raise job.exception()

                # Become the new job:
                self.__dict__.clear()
                self.__dict__.update(job.__dict__)

                # This shouldn't be necessary, because once we have a good
                # job, it should stay good,and we shouldn't have to retry.
                # But let's be paranoid. :)
                self._retry_do_query = retry_do_query
                self._job_retry = job_retry

            super(QueryJob, self).result(retry=retry, timeout=timeout)

            # Since the job could already be "done" (e.g. got a finished job
            # via client.get_job), the superclass call to done() might not
            # set the self._query_results cache.
            self._reload_query_results(retry=retry, timeout=timeout)

        if retry_do_query is not None and job_retry is not None:
            do_get_result = job_retry(do_get_result)
      do_get_result()

.nox/prerelease/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py:1499:


args = (), kwargs = {}
target = functools.partial(<function QueryJob.result..do_get_result at 0x7fcff0076670>)
sleep_generator = <generator object exponential_sleep_generator at 0x7fcff009fcf0>

@functools.wraps(func)
def retry_wrapped_func(*args, **kwargs):
    """A wrapper that calls target function with retry."""
    target = functools.partial(func, *args, **kwargs)
    sleep_generator = exponential_sleep_generator(
        self._initial, self._maximum, multiplier=self._multiplier
    )
  return retry_target(
        target,
        self._predicate,
        sleep_generator,
        self._deadline,
        on_error=on_error,
    )

.nox/prerelease/lib/python3.8/site-packages/google/api_core/retry.py:283:


target = functools.partial(<function QueryJob.result..do_get_result at 0x7fcff0076670>)
predicate = <function _job_should_retry at 0x7fcffa59d670>
sleep_generator = <generator object exponential_sleep_generator at 0x7fcff009fcf0>
deadline = 600.0, on_error = None

def retry_target(target, predicate, sleep_generator, deadline, on_error=None):
    """Call a function and retry if it fails.

    This is the lowest-level retry helper. Generally, you'll use the
    higher-level retry helper :class:`Retry`.

    Args:
        target(Callable): The function to call and retry. This must be a
            nullary function - apply arguments with `functools.partial`.
        predicate (Callable[Exception]): A callable used to determine if an
            exception raised by the target should be considered retryable.
            It should return True to retry or False otherwise.
        sleep_generator (Iterable[float]): An infinite iterator that determines
            how long to sleep between retries.
        deadline (float): How long to keep retrying the target. The last sleep
            period is shortened as necessary, so that the last retry runs at
            ``deadline`` (and not considerably beyond it).
        on_error (Callable[Exception]): A function to call while processing a
            retryable exception.  Any error raised by this function will *not*
            be caught.

    Returns:
        Any: the return value of the target function.

    Raises:
        google.api_core.RetryError: If the deadline is exceeded while retrying.
        ValueError: If the sleep generator stops yielding values.
        Exception: If the target raises a method that isn't retryable.
    """
    if deadline is not None:
        deadline_datetime = datetime_helpers.utcnow() + datetime.timedelta(
            seconds=deadline
        )
    else:
        deadline_datetime = None

    last_exc = None

    for sleep in sleep_generator:
        try:
          return target()

.nox/prerelease/lib/python3.8/site-packages/google/api_core/retry.py:190:


def do_get_result():
    nonlocal first

    if first:
        first = False
    else:
        # Note that we won't get here if retry_do_query is
        # None, because we won't use a retry.

        # The orinal job is failed. Create a new one.
        job = retry_do_query()

        # If it's already failed, we might as well stop:
        if job.done() and job.exception() is not None:
            raise job.exception()

        # Become the new job:
        self.__dict__.clear()
        self.__dict__.update(job.__dict__)

        # This shouldn't be necessary, because once we have a good
        # job, it should stay good,and we shouldn't have to retry.
        # But let's be paranoid. :)
        self._retry_do_query = retry_do_query
        self._job_retry = job_retry
  super(QueryJob, self).result(retry=retry, timeout=timeout)

.nox/prerelease/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py:1489:


self = QueryJob<project=precise-truck-742, location=US, id=03c9dc2f-f605-4af4-971a-da3c2c10915c>
retry = <google.api_core.retry.Retry object at 0x7fcffa574520>, timeout = None

def result(  # type: ignore  # (signature complaint)
    self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None
) -> "_AsyncJob":
    """Start the job and wait for it to complete and get the result.

    Args:
        retry (Optional[google.api_core.retry.Retry]):
            How to retry the RPC. If the job state is ``DONE``, retrying is aborted
            early, as the job will not change anymore.
        timeout (Optional[float]):
            The number of seconds to wait for the underlying HTTP transport
            before using ``retry``.
            If multiple requests are made under the hood, ``timeout``
            applies to each individual request.

    Returns:
        _AsyncJob: This instance.

    Raises:
        google.cloud.exceptions.GoogleAPICallError:
            if the job failed.
        concurrent.futures.TimeoutError:
            if the job did not complete in the given timeout.
    """
    if self.state is None:
        self._begin(retry=retry, timeout=timeout)

    kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
  return super(_AsyncJob, self).result(timeout=timeout, **kwargs)

.nox/prerelease/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py:728:


self = QueryJob<project=precise-truck-742, location=US, id=03c9dc2f-f605-4af4-971a-da3c2c10915c>
timeout = None, retry = <google.api_core.retry.Retry object at 0x7fcffa5d8d60>

def result(self, timeout=None, retry=DEFAULT_RETRY):
    """Get the result of the operation, blocking if necessary.

    Args:
        timeout (int):
            How long (in seconds) to wait for the operation to complete.
            If None, wait indefinitely.

    Returns:
        google.protobuf.Message: The Operation's result.

    Raises:
        google.api_core.GoogleAPICallError: If the operation errors or if
            the timeout is reached before the operation completes.
    """
    kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
    self._blocking_poll(timeout=timeout, **kwargs)

    if self._exception is not None:
        # pylint: disable=raising-bad-type
        # Pylint doesn't recognize that this is valid in this case.
      raise self._exception

E google.api_core.exceptions.NotFound: 404 Not found: Table bigquery-public-data:new_york_taxi_trips.tlc_green_trips_2014 was not found in location US
E
E Location: US
E Job ID: 03c9dc2f-f605-4af4-971a-da3c2c10915c

.nox/prerelease/lib/python3.8/site-packages/google/api_core/future/polling.py:137: NotFound

During handling of the above exception, another exception occurred:

random_dataset = Dataset(DatasetReference('precise-truck-742', 'python_bigquery_pandas_tests_system_20220811105004_6f0517'))
method_under_test = functools.partial(<function read_gbq at 0x7fcff9197f70>, project_id='precise-truck-742', credentials=<google.oauth2.service_account.Credentials object at 0x7fcff90f3040>)

def test_large_results(random_dataset, method_under_test):
  df = method_under_test(
        """
        SELECT
          total_amount,
          passenger_count,
          trip_distance
        FROM `bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2014`
        -- Select non-null rows for no-copy conversion from Arrow to pandas.
        WHERE total_amount IS NOT NULL
          AND passenger_count IS NOT NULL
          AND trip_distance IS NOT NULL
        LIMIT 10000000
        """,
        use_bqstorage_api=True,
        configuration={
            "query": {
                "destinationTable": {
                    "projectId": random_dataset.project,
                    "datasetId": random_dataset.dataset_id,
                    "tableId": "".join(
                        [
                            "test_read_gbq_w_bqstorage_api_",
                            str(uuid.uuid4()).replace("-", "_"),
                        ]
                    ),
                },
                "writeDisposition": "WRITE_TRUNCATE",
            }
        },
    )

tests/system/test_read_gbq_with_bqstorage.py:47:


pandas_gbq/gbq.py:921: in read_gbq
final_df = connector.run_query(
pandas_gbq/gbq.py:519: in run_query
self.process_http_error(ex)


ex = NotFound('Not found: Table bigquery-public-data:new_york_taxi_trips.tlc_green_trips_2014 was not found in location US')

@staticmethod
def process_http_error(ex):
    # See `BigQuery Troubleshooting Errors
    # <https://cloud.google.com/bigquery/troubleshooting-errors>`__

    if "cancelled" in ex.message:
        raise QueryTimeout("Reason: {0}".format(ex))
  raise GenericGBQException("Reason: {0}".format(ex))

E pandas_gbq.exceptions.GenericGBQException: Reason: 404 Not found: Table bigquery-public-data:new_york_taxi_trips.tlc_green_trips_2014 was not found in location US
E
E Location: US
E Job ID: 03c9dc2f-f605-4af4-971a-da3c2c10915c

pandas_gbq/gbq.py:386: GenericGBQException

Metadata

Metadata

Assignees

Labels

api: bigqueryIssues related to the googleapis/python-bigquery-pandas API.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions