From 40a0ef7cb792174e104e5fe64e6124d42c45aa61 Mon Sep 17 00:00:00 2001 From: Kien Truong Date: Sun, 3 Nov 2024 10:13:55 +0700 Subject: [PATCH 1/6] feat: support setting max_stream_count when fetching query result Allow user to set max_stream_count when fetching result using BigQuery Storage API with RowIterator's incremental methods: * to_arrow_iterable * to_dataframe_iterable --- google/cloud/bigquery/table.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index faf827be4..257a5d8e9 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1812,6 +1812,7 @@ def to_arrow_iterable( self, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore + max_stream_count: Optional[int] = None, ) -> Iterator["pyarrow.RecordBatch"]: """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. @@ -1836,6 +1837,17 @@ def to_arrow_iterable( created by the server. If ``max_queue_size`` is :data:`None`, the queue size is infinite. + max_stream_count (Optional[int]): + The maximum number of parallel download streams when + using BigQuery Storage API. Ignored if + BigQuery Storage API is not used. + + By default, this value is unnset, and the number of download + streams is determined by BigQuery the server. However, this behaviour + can require a lot of memory to store temporary download result, + especially with very large queries. If that's the case, + this parameter can be used to reduce memory consumption. + Returns: pyarrow.RecordBatch: A generator of :class:`~pyarrow.RecordBatch`. @@ -1852,6 +1864,7 @@ def to_arrow_iterable( preserve_order=self._preserve_order, selected_fields=self._selected_fields, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -1978,6 +1991,7 @@ def to_dataframe_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, dtypes: Optional[Dict[str, Any]] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore + max_stream_count: Optional[int] = None, ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2008,6 +2022,17 @@ def to_dataframe_iterable( .. versionadded:: 2.14.0 + max_stream_count (Optional[int]): + The maximum number of parallel download streams when + using BigQuery Storage API. Ignored if + BigQuery Storage API is not used. + + By default, this value is unnset, and the number of download + streams is determined by BigQuery the server. However, this behaviour + can require a lot of memory to store temporary download result, + especially with very large queries. If that's the case, + this parameter can be used to reduce memory consumption. + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -2034,6 +2059,7 @@ def to_dataframe_iterable( preserve_order=self._preserve_order, selected_fields=self._selected_fields, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, From 5e8811c957ef8131bb95feff8dce81677f3baf2f Mon Sep 17 00:00:00 2001 From: Kien Truong Date: Sat, 16 Nov 2024 09:24:36 +0700 Subject: [PATCH 2/6] docs: update docs about max_stream_count for ordered query --- google/cloud/bigquery/table.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 257a5d8e9..3573f5909 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1842,6 +1842,10 @@ def to_arrow_iterable( using BigQuery Storage API. Ignored if BigQuery Storage API is not used. + This setting also has no effect if the query result + is deterministically ordered with ORDER BY, + in which case, the number of download stream is always 1. + By default, this value is unnset, and the number of download streams is determined by BigQuery the server. However, this behaviour can require a lot of memory to store temporary download result, @@ -2027,6 +2031,10 @@ def to_dataframe_iterable( using BigQuery Storage API. Ignored if BigQuery Storage API is not used. + This setting also has no effect if the query result + is deterministically ordered with ORDER BY, + in which case, the number of download stream is always 1. + By default, this value is unnset, and the number of download streams is determined by BigQuery the server. However, this behaviour can require a lot of memory to store temporary download result, From 9e20bace043a8a9380eeac4a699940a446e8750f Mon Sep 17 00:00:00 2001 From: Kien Truong Date: Sat, 16 Nov 2024 09:29:53 +0700 Subject: [PATCH 3/6] fix: add max_stream_count params to _EmptyRowIterator's methods --- google/cloud/bigquery/table.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 3573f5909..e20092a59 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2724,6 +2724,7 @@ def to_dataframe_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, dtypes: Optional[Dict[str, Any]] = None, max_queue_size: Optional[int] = None, + max_stream_count: Optional[int] = None, ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2739,6 +2740,9 @@ def to_dataframe_iterable( max_queue_size: Ignored. Added for compatibility with RowIterator. + max_stream_count: + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pandas.DataFrame`. @@ -2753,6 +2757,7 @@ def to_arrow_iterable( self, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: Optional[int] = None, + max_stream_count: Optional[int] = None, ) -> Iterator["pyarrow.RecordBatch"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2765,6 +2770,9 @@ def to_arrow_iterable( max_queue_size: Ignored. Added for compatibility with RowIterator. + max_stream_count: + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. """ From fb726ebb0ed0669358666606b5984056584a12ff Mon Sep 17 00:00:00 2001 From: Kien Truong Date: Sat, 16 Nov 2024 10:38:28 +0700 Subject: [PATCH 4/6] test: add tests for RowIterator's max_stream_count parameter --- tests/unit/test_table.py | 72 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 018a096df..36414f11f 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -5822,3 +5822,75 @@ def test_table_reference_to_bqstorage_v1_stable(table_path): for klass in (mut.TableReference, mut.Table, mut.TableListItem): got = klass.from_string(table_path).to_bqstorage() assert got == expected + + +@pytest.mark.parametrize("preserve_order", [True, False]) +def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order): + pytest.importorskip("pandas") + pytest.importorskip("google.cloud.bigquery_storage") + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud import bigquery_storage + + bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + session = bigquery_storage.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + + row_iterator = mut.RowIterator( + _mock_client(), + api_request=None, + path=None, + schema=[ + schema.SchemaField("colA", "INTEGER"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator._preserve_order = preserve_order + + max_stream_count = 132 + result_iterable = row_iterator.to_arrow_iterable( + bqstorage_client=bqstorage_client, max_stream_count=max_stream_count + ) + for _ in result_iterable: # pragma: NO COVER + pass + bqstorage_client.create_read_session.assert_called_once_with( + parent=mock.ANY, + read_session=mock.ANY, + max_stream_count=max_stream_count if not preserve_order else 1, + ) + + +@pytest.mark.parametrize("preserve_order", [True, False]) +def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order): + pytest.importorskip("pandas") + pytest.importorskip("google.cloud.bigquery_storage") + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud import bigquery_storage + + bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + session = bigquery_storage.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + + row_iterator = mut.RowIterator( + _mock_client(), + api_request=None, + path=None, + schema=[ + schema.SchemaField("colA", "INTEGER"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator._preserve_order = preserve_order + + max_stream_count = 132 + result_iterable = row_iterator.to_dataframe_iterable( + bqstorage_client=bqstorage_client, max_stream_count=max_stream_count + ) + for _ in result_iterable: # pragma: NO COVER + pass + bqstorage_client.create_read_session.assert_called_once_with( + parent=mock.ANY, + read_session=mock.ANY, + max_stream_count=max_stream_count if not preserve_order else 1, + ) From 2c936b2ce1ea6308be740c6172078d08ad4a0560 Mon Sep 17 00:00:00 2001 From: Kien Truong Date: Sat, 16 Nov 2024 11:05:38 +0700 Subject: [PATCH 5/6] docs: add notes on valid max_stream_count range in docstring --- google/cloud/bigquery/table.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index e20092a59..dcaf377e3 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1846,11 +1846,12 @@ def to_arrow_iterable( is deterministically ordered with ORDER BY, in which case, the number of download stream is always 1. - By default, this value is unnset, and the number of download + If set to 0 or None (the default), the number of download streams is determined by BigQuery the server. However, this behaviour can require a lot of memory to store temporary download result, - especially with very large queries. If that's the case, - this parameter can be used to reduce memory consumption. + especially with very large queries. In that case, + setting this parameter value to a value > 0 can help + reduce system resource consumption. Returns: pyarrow.RecordBatch: @@ -2035,11 +2036,12 @@ def to_dataframe_iterable( is deterministically ordered with ORDER BY, in which case, the number of download stream is always 1. - By default, this value is unnset, and the number of download + If set to 0 or None (the default), the number of download streams is determined by BigQuery the server. However, this behaviour can require a lot of memory to store temporary download result, - especially with very large queries. If that's the case, - this parameter can be used to reduce memory consumption. + especially with very large queries. In that case, + setting this parameter value to a value > 0 can help + reduce system resource consumption. Returns: pandas.DataFrame: From c00c7b3068341fdf3bcdce811e2ecba9455a8be4 Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Sat, 23 Nov 2024 04:11:21 +0800 Subject: [PATCH 6/6] use a different way to iterate result --- tests/unit/test_table.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 36414f11f..d81ad2dca 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -5851,8 +5851,7 @@ def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order): result_iterable = row_iterator.to_arrow_iterable( bqstorage_client=bqstorage_client, max_stream_count=max_stream_count ) - for _ in result_iterable: # pragma: NO COVER - pass + list(result_iterable) bqstorage_client.create_read_session.assert_called_once_with( parent=mock.ANY, read_session=mock.ANY, @@ -5887,8 +5886,7 @@ def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order): result_iterable = row_iterator.to_dataframe_iterable( bqstorage_client=bqstorage_client, max_stream_count=max_stream_count ) - for _ in result_iterable: # pragma: NO COVER - pass + list(result_iterable) bqstorage_client.create_read_session.assert_called_once_with( parent=mock.ANY, read_session=mock.ANY,