Skip to content

Commit 3314dfb

Browse files
authored
fix: apply timeout to all resumable upload requests (#1070)
* fix: apply timeout to all resumable upload requests * Fix stub in test case * Improve timeout type and other type annotations * Annnotate return type of _do_resumable_upload()
1 parent 21cd710 commit 3314dfb

File tree

2 files changed

+128
-76
lines changed

2 files changed

+128
-76
lines changed

google/cloud/bigquery/client.py

Lines changed: 111 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131
import typing
3232
from typing import (
3333
Any,
34-
BinaryIO,
3534
Dict,
35+
IO,
3636
Iterable,
37+
Mapping,
3738
List,
3839
Optional,
3940
Sequence,
@@ -112,10 +113,15 @@
112113
pyarrow = _helpers.PYARROW_VERSIONS.try_import()
113114

114115
TimeoutType = Union[float, None]
116+
ResumableTimeoutType = Union[
117+
None, float, Tuple[float, float]
118+
] # for resumable media methods
115119

116120
if typing.TYPE_CHECKING: # pragma: NO COVER
117121
# os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
118122
PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
123+
import pandas # type: ignore
124+
import requests # required by api-core
119125

120126
_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
121127
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
@@ -2348,7 +2354,7 @@ def load_table_from_uri(
23482354

23492355
def load_table_from_file(
23502356
self,
2351-
file_obj: BinaryIO,
2357+
file_obj: IO[bytes],
23522358
destination: Union[Table, TableReference, TableListItem, str],
23532359
rewind: bool = False,
23542360
size: int = None,
@@ -2358,50 +2364,50 @@ def load_table_from_file(
23582364
location: str = None,
23592365
project: str = None,
23602366
job_config: LoadJobConfig = None,
2361-
timeout: TimeoutType = DEFAULT_TIMEOUT,
2367+
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
23622368
) -> job.LoadJob:
23632369
"""Upload the contents of this table from a file-like object.
23642370
23652371
Similar to :meth:`load_table_from_uri`, this method creates, starts and
23662372
returns a :class:`~google.cloud.bigquery.job.LoadJob`.
23672373
23682374
Args:
2369-
file_obj (file): A file handle opened in binary mode for reading.
2370-
destination (Union[ \
2371-
google.cloud.bigquery.table.Table, \
2372-
google.cloud.bigquery.table.TableReference, \
2373-
google.cloud.bigquery.table.TableListItem, \
2374-
str, \
2375-
]):
2375+
file_obj:
2376+
A file handle opened in binary mode for reading.
2377+
destination:
23762378
Table into which data is to be loaded. If a string is passed
23772379
in, this method attempts to create a table reference from a
23782380
string using
23792381
:func:`google.cloud.bigquery.table.TableReference.from_string`.
23802382
23812383
Keyword Arguments:
2382-
rewind (Optional[bool]):
2384+
rewind:
23832385
If True, seek to the beginning of the file handle before
23842386
reading the file.
2385-
size (Optional[int]):
2387+
size:
23862388
The number of bytes to read from the file handle. If size is
23872389
``None`` or large, resumable upload will be used. Otherwise,
23882390
multipart upload will be used.
2389-
num_retries (Optional[int]): Number of upload retries. Defaults to 6.
2390-
job_id (Optional[str]): Name of the job.
2391-
job_id_prefix (Optional[str]):
2391+
num_retries: Number of upload retries. Defaults to 6.
2392+
job_id: Name of the job.
2393+
job_id_prefix:
23922394
The user-provided prefix for a randomly generated job ID.
23932395
This parameter will be ignored if a ``job_id`` is also given.
2394-
location (Optional[str]):
2396+
location:
23952397
Location where to run the job. Must match the location of the
23962398
destination table.
2397-
project (Optional[str]):
2399+
project:
23982400
Project ID of the project of where to run the job. Defaults
23992401
to the client's project.
2400-
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2402+
job_config:
24012403
Extra configuration options for the job.
2402-
timeout (Optional[float]):
2404+
timeout:
24032405
The number of seconds to wait for the underlying HTTP transport
2404-
before using ``retry``.
2406+
before using ``retry``. Depending on the retry strategy, a request
2407+
may be repeated several times using the same timeout each time.
2408+
2409+
Can also be passed as a tuple (connect_timeout, read_timeout).
2410+
See :meth:`requests.Session.request` documentation for details.
24052411
24062412
Returns:
24072413
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2453,7 +2459,7 @@ def load_table_from_file(
24532459

24542460
def load_table_from_dataframe(
24552461
self,
2456-
dataframe,
2462+
dataframe: "pandas.DataFrame",
24572463
destination: Union[Table, TableReference, str],
24582464
num_retries: int = _DEFAULT_NUM_RETRIES,
24592465
job_id: str = None,
@@ -2462,7 +2468,7 @@ def load_table_from_dataframe(
24622468
project: str = None,
24632469
job_config: LoadJobConfig = None,
24642470
parquet_compression: str = "snappy",
2465-
timeout: TimeoutType = DEFAULT_TIMEOUT,
2471+
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
24662472
) -> job.LoadJob:
24672473
"""Upload the contents of a table from a pandas DataFrame.
24682474
@@ -2481,9 +2487,9 @@ def load_table_from_dataframe(
24812487
https://github.com/googleapis/python-bigquery/issues/19
24822488
24832489
Args:
2484-
dataframe (pandas.DataFrame):
2490+
dataframe:
24852491
A :class:`~pandas.DataFrame` containing the data to load.
2486-
destination (google.cloud.bigquery.table.TableReference):
2492+
destination:
24872493
The destination table to use for loading the data. If it is an
24882494
existing table, the schema of the :class:`~pandas.DataFrame`
24892495
must match the schema of the destination table. If the table
@@ -2495,19 +2501,19 @@ def load_table_from_dataframe(
24952501
:func:`google.cloud.bigquery.table.TableReference.from_string`.
24962502
24972503
Keyword Arguments:
2498-
num_retries (Optional[int]): Number of upload retries.
2499-
job_id (Optional[str]): Name of the job.
2500-
job_id_prefix (Optional[str]):
2504+
num_retries: Number of upload retries.
2505+
job_id: Name of the job.
2506+
job_id_prefix:
25012507
The user-provided prefix for a randomly generated
25022508
job ID. This parameter will be ignored if a ``job_id`` is
25032509
also given.
2504-
location (Optional[str]):
2510+
location:
25052511
Location where to run the job. Must match the location of the
25062512
destination table.
2507-
project (Optional[str]):
2513+
project:
25082514
Project ID of the project of where to run the job. Defaults
25092515
to the client's project.
2510-
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2516+
job_config:
25112517
Extra configuration options for the job.
25122518
25132519
To override the default pandas data type conversions, supply
@@ -2524,7 +2530,7 @@ def load_table_from_dataframe(
25242530
:attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
25252531
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
25262532
supported.
2527-
parquet_compression (Optional[str]):
2533+
parquet_compression:
25282534
[Beta] The compression method to use if intermittently
25292535
serializing ``dataframe`` to a parquet file.
25302536
@@ -2537,9 +2543,13 @@ def load_table_from_dataframe(
25372543
passed as the ``compression`` argument to the underlying
25382544
``DataFrame.to_parquet()`` method.
25392545
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2540-
timeout (Optional[float]):
2546+
timeout:
25412547
The number of seconds to wait for the underlying HTTP transport
2542-
before using ``retry``.
2548+
before using ``retry``. Depending on the retry strategy, a request may
2549+
be repeated several times using the same timeout each time.
2550+
2551+
Can also be passed as a tuple (connect_timeout, read_timeout).
2552+
See :meth:`requests.Session.request` documentation for details.
25432553
25442554
Returns:
25452555
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2717,7 +2727,7 @@ def load_table_from_json(
27172727
location: str = None,
27182728
project: str = None,
27192729
job_config: LoadJobConfig = None,
2720-
timeout: TimeoutType = DEFAULT_TIMEOUT,
2730+
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
27212731
) -> job.LoadJob:
27222732
"""Upload the contents of a table from a JSON string or dict.
27232733
@@ -2741,36 +2751,35 @@ def load_table_from_json(
27412751
client = bigquery.Client()
27422752
client.load_table_from_file(data_as_file, ...)
27432753
2744-
destination (Union[ \
2745-
google.cloud.bigquery.table.Table, \
2746-
google.cloud.bigquery.table.TableReference, \
2747-
google.cloud.bigquery.table.TableListItem, \
2748-
str, \
2749-
]):
2754+
destination:
27502755
Table into which data is to be loaded. If a string is passed
27512756
in, this method attempts to create a table reference from a
27522757
string using
27532758
:func:`google.cloud.bigquery.table.TableReference.from_string`.
27542759
27552760
Keyword Arguments:
2756-
num_retries (Optional[int]): Number of upload retries.
2757-
job_id (Optional[str]): Name of the job.
2758-
job_id_prefix (Optional[str]):
2761+
num_retries: Number of upload retries.
2762+
job_id: Name of the job.
2763+
job_id_prefix:
27592764
The user-provided prefix for a randomly generated job ID.
27602765
This parameter will be ignored if a ``job_id`` is also given.
2761-
location (Optional[str]):
2766+
location:
27622767
Location where to run the job. Must match the location of the
27632768
destination table.
2764-
project (Optional[str]):
2769+
project:
27652770
Project ID of the project of where to run the job. Defaults
27662771
to the client's project.
2767-
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2772+
job_config:
27682773
Extra configuration options for the job. The ``source_format``
27692774
setting is always set to
27702775
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2771-
timeout (Optional[float]):
2776+
timeout:
27722777
The number of seconds to wait for the underlying HTTP transport
2773-
before using ``retry``.
2778+
before using ``retry``. Depending on the retry strategy, a request may
2779+
be repeated several times using the same timeout each time.
2780+
2781+
Can also be passed as a tuple (connect_timeout, read_timeout).
2782+
See :meth:`requests.Session.request` documentation for details.
27742783
27752784
Returns:
27762785
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2819,60 +2828,77 @@ def load_table_from_json(
28192828
)
28202829

28212830
def _do_resumable_upload(
2822-
self, stream, metadata, num_retries, timeout, project=None
2823-
):
2831+
self,
2832+
stream: IO[bytes],
2833+
metadata: Mapping[str, str],
2834+
num_retries: int,
2835+
timeout: Optional[ResumableTimeoutType],
2836+
project: Optional[str] = None,
2837+
) -> "requests.Response":
28242838
"""Perform a resumable upload.
28252839
28262840
Args:
2827-
stream (IO[bytes]): A bytes IO object open for reading.
2841+
stream: A bytes IO object open for reading.
28282842
2829-
metadata (Dict): The metadata associated with the upload.
2843+
metadata: The metadata associated with the upload.
28302844
2831-
num_retries (int):
2845+
num_retries:
28322846
Number of upload retries. (Deprecated: This
28332847
argument will be removed in a future release.)
28342848
2835-
timeout (float):
2849+
timeout:
28362850
The number of seconds to wait for the underlying HTTP transport
2837-
before using ``retry``.
2851+
before using ``retry``. Depending on the retry strategy, a request may
2852+
be repeated several times using the same timeout each time.
28382853
2839-
project (Optional[str]):
2854+
Can also be passed as a tuple (connect_timeout, read_timeout).
2855+
See :meth:`requests.Session.request` documentation for details.
2856+
2857+
project:
28402858
Project ID of the project of where to run the upload. Defaults
28412859
to the client's project.
28422860
28432861
Returns:
2844-
requests.Response:
2845-
The "200 OK" response object returned after the final chunk
2846-
is uploaded.
2862+
The "200 OK" response object returned after the final chunk
2863+
is uploaded.
28472864
"""
28482865
upload, transport = self._initiate_resumable_upload(
28492866
stream, metadata, num_retries, timeout, project=project
28502867
)
28512868

28522869
while not upload.finished:
2853-
response = upload.transmit_next_chunk(transport)
2870+
response = upload.transmit_next_chunk(transport, timeout=timeout)
28542871

28552872
return response
28562873

28572874
def _initiate_resumable_upload(
2858-
self, stream, metadata, num_retries, timeout, project=None
2875+
self,
2876+
stream: IO[bytes],
2877+
metadata: Mapping[str, str],
2878+
num_retries: int,
2879+
timeout: Optional[ResumableTimeoutType],
2880+
project: Optional[str] = None,
28592881
):
28602882
"""Initiate a resumable upload.
28612883
28622884
Args:
2863-
stream (IO[bytes]): A bytes IO object open for reading.
2885+
stream: A bytes IO object open for reading.
28642886
2865-
metadata (Dict): The metadata associated with the upload.
2887+
metadata: The metadata associated with the upload.
28662888
2867-
num_retries (int):
2889+
num_retries:
28682890
Number of upload retries. (Deprecated: This
28692891
argument will be removed in a future release.)
28702892
2871-
timeout (float):
2893+
timeout:
28722894
The number of seconds to wait for the underlying HTTP transport
2873-
before using ``retry``.
2895+
before using ``retry``. Depending on the retry strategy, a request may
2896+
be repeated several times using the same timeout each time.
28742897
2875-
project (Optional[str]):
2898+
Can also be passed as a tuple (connect_timeout, read_timeout).
2899+
See :meth:`requests.Session.request` documentation for details.
2900+
2901+
project:
28762902
Project ID of the project of where to run the upload. Defaults
28772903
to the client's project.
28782904
@@ -2921,29 +2947,39 @@ def _initiate_resumable_upload(
29212947
return upload, transport
29222948

29232949
def _do_multipart_upload(
2924-
self, stream, metadata, size, num_retries, timeout, project=None
2950+
self,
2951+
stream: IO[bytes],
2952+
metadata: Mapping[str, str],
2953+
size: int,
2954+
num_retries: int,
2955+
timeout: Optional[ResumableTimeoutType],
2956+
project: Optional[str] = None,
29252957
):
29262958
"""Perform a multipart upload.
29272959
29282960
Args:
2929-
stream (IO[bytes]): A bytes IO object open for reading.
2961+
stream: A bytes IO object open for reading.
29302962
2931-
metadata (Dict): The metadata associated with the upload.
2963+
metadata: The metadata associated with the upload.
29322964
2933-
size (int):
2965+
size:
29342966
The number of bytes to be uploaded (which will be read
29352967
from ``stream``). If not provided, the upload will be
29362968
concluded once ``stream`` is exhausted (or :data:`None`).
29372969
2938-
num_retries (int):
2970+
num_retries:
29392971
Number of upload retries. (Deprecated: This
29402972
argument will be removed in a future release.)
29412973
2942-
timeout (float):
2974+
timeout:
29432975
The number of seconds to wait for the underlying HTTP transport
2944-
before using ``retry``.
2976+
before using ``retry``. Depending on the retry strategy, a request may
2977+
be repeated several times using the same timeout each time.
29452978
2946-
project (Optional[str]):
2979+
Can also be passed as a tuple (connect_timeout, read_timeout).
2980+
See :meth:`requests.Session.request` documentation for details.
2981+
2982+
project:
29472983
Project ID of the project of where to run the upload. Defaults
29482984
to the client's project.
29492985

0 commit comments

Comments
 (0)