From a710dfb8ba1511b243050d95968e7b368ebc107b Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 4 Jun 2024 17:54:44 +0000 Subject: [PATCH 1/6] fix: Improve to_pandas_batches for large results --- bigframes/core/blocks.py | 14 ++++++++++++-- bigframes/dataframe.py | 8 ++++++-- tests/system/load/test_large_tables.py | 10 ++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ea063669d5..d04c8ade9a 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -508,11 +508,21 @@ def try_peek( else: return None - def to_pandas_batches(self): + def to_pandas_batches( + self, page_size: Optional[int] = None, max_results: Optional[int] = None + ): """Download results one message at a time.""" dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) - results_iterator, _ = self.session._execute(self.expr, sorted=True) + _, query_job = self.session._query_to_destination( + self.session._to_sql(self.expr, sorted=True), + list(self.index_columns), + api_name="cached", + do_clustering=False, + ) + results_iterator = query_job.result( + page_size=page_size, max_results=max_results + ) for arrow_table in results_iterator.to_arrow_iterable( bqstorage_client=self.session.bqstoragereadclient ): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e404e439ab..28b2dde139 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1215,10 +1215,14 @@ def to_pandas( self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) - def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: + def to_pandas_batches( + self, page_size: Optional[int] = None, max_results: Optional[int] = None + ) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame""" self._optimize_query_complexity() - return self._block.to_pandas_batches() + return self._block.to_pandas_batches( + page_size=page_size, max_results=max_results + ) def _compute_dry_run(self) -> bigquery.QueryJob: return self._block._compute_dry_run() diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index cf1c787a58..d7d0dca37e 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -76,19 +76,21 @@ def test_index_repr_large_table(): def test_to_pandas_batches_large_table(): df = bpd.read_gbq("load_testing.scalars_10gb") + # double size to catch 10gb limit errors + for column in df: + df[column + "_2"] = df[column] # df will be downloaded locally expected_row_count, expected_column_count = df.shape row_count = 0 - # TODO(b/340890167): fix type error - for df in df.to_pandas_batches(): # type: ignore - batch_row_count, batch_column_count = df.shape + for pdf in df.to_pandas_batches(page_size=500000): + batch_row_count, batch_column_count = pdf.shape assert batch_column_count == expected_column_count row_count += batch_row_count # Attempt to save on memory by manually removing the batch df # from local memory after finishing with processing. - del df + del pdf assert row_count == expected_row_count From 798a915657af8fbe0f57dfb198fb7831eec5aa3f Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Wed, 5 Jun 2024 00:13:10 +0000 Subject: [PATCH 2/6] remove page_size and use 1tb --- bigframes/core/blocks.py | 10 +++------- bigframes/dataframe.py | 8 ++------ tests/system/load/test_large_tables.py | 22 +++++++--------------- 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d04c8ade9a..e4554ad2b4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -508,9 +508,7 @@ def try_peek( else: return None - def to_pandas_batches( - self, page_size: Optional[int] = None, max_results: Optional[int] = None - ): + def to_pandas_batches(self): """Download results one message at a time.""" dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) @@ -520,11 +518,9 @@ def to_pandas_batches( api_name="cached", do_clustering=False, ) - results_iterator = query_job.result( - page_size=page_size, max_results=max_results - ) + results_iterator = query_job.result(page_size=50) for arrow_table in results_iterator.to_arrow_iterable( - bqstorage_client=self.session.bqstoragereadclient + # bqstorage_client=self.session.bqstoragereadclient ): df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) self._copy_index_to_pandas(df) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 28b2dde139..e404e439ab 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1215,14 +1215,10 @@ def to_pandas( self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) - def to_pandas_batches( - self, page_size: Optional[int] = None, max_results: Optional[int] = None - ) -> Iterable[pandas.DataFrame]: + def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame""" self._optimize_query_complexity() - return self._block.to_pandas_batches( - page_size=page_size, max_results=max_results - ) + return self._block.to_pandas_batches() def _compute_dry_run(self) -> bigquery.QueryJob: return self._block._compute_dry_run() diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index d7d0dca37e..1a304f62a2 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -75,24 +75,16 @@ def test_index_repr_large_table(): def test_to_pandas_batches_large_table(): - df = bpd.read_gbq("load_testing.scalars_10gb") - # double size to catch 10gb limit errors - for column in df: - df[column + "_2"] = df[column] - # df will be downloaded locally - expected_row_count, expected_column_count = df.shape + df = bpd.read_gbq("load_testing.scalars_1tb") + _, expected_column_count = df.shape - row_count = 0 - for pdf in df.to_pandas_batches(page_size=500000): + # download only a few batches, since 1tb would be too much + iterator = iter(df.to_pandas_batches()) + for _ in range(3): + pdf = next(iterator) batch_row_count, batch_column_count = pdf.shape assert batch_column_count == expected_column_count - row_count += batch_row_count - - # Attempt to save on memory by manually removing the batch df - # from local memory after finishing with processing. - del pdf - - assert row_count == expected_row_count + assert batch_row_count > 0 @pytest.mark.skip(reason="See if it caused kokoro build aborted.") From 32f826ef781390d90583ba9f127161f95c83bf10 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Wed, 5 Jun 2024 02:47:05 +0000 Subject: [PATCH 3/6] don't pass bqstorage client --- bigframes/core/blocks.py | 6 ++++-- tests/system/load/test_large_tables.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index e4554ad2b4..23571b6112 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -518,9 +518,11 @@ def to_pandas_batches(self): api_name="cached", do_clustering=False, ) - results_iterator = query_job.result(page_size=50) + results_iterator = query_job.result() for arrow_table in results_iterator.to_arrow_iterable( - # bqstorage_client=self.session.bqstoragereadclient + # we can't pass bqstorage_client=self.session.bqstoragereadclient + # because large results will take too long to be downloaded to the + # storage and won't be streamed ): df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) self._copy_index_to_pandas(df) diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index 1a304f62a2..93f1cb66d4 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -75,7 +75,7 @@ def test_index_repr_large_table(): def test_to_pandas_batches_large_table(): - df = bpd.read_gbq("load_testing.scalars_1tb") + df = bpd.read_gbq("load_testing.scalars_100gb") _, expected_column_count = df.shape # download only a few batches, since 1tb would be too much From 52a3f7814877ac32fb67112491b2443d6f996f32 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 6 Jun 2024 16:05:10 +0000 Subject: [PATCH 4/6] use page_size --- bigframes/core/blocks.py | 13 ++++++++++--- bigframes/dataframe.py | 26 +++++++++++++++++++++++--- tests/system/load/test_large_tables.py | 6 ++++-- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 23571b6112..3808b61fe4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -508,8 +508,13 @@ def try_peek( else: return None - def to_pandas_batches(self): - """Download results one message at a time.""" + def to_pandas_batches( + self, page_size: Optional[int] = None, max_results: Optional[int] = None + ): + """Download results one message at a time. + + page_size and max_results determine the size and number of batches, + see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result""" dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) _, query_job = self.session._query_to_destination( @@ -518,7 +523,9 @@ def to_pandas_batches(self): api_name="cached", do_clustering=False, ) - results_iterator = query_job.result() + results_iterator = query_job.result( + page_size=page_size, max_results=max_results + ) for arrow_table in results_iterator.to_arrow_iterable( # we can't pass bqstorage_client=self.session.bqstoragereadclient # because large results will take too long to be downloaded to the diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e404e439ab..874ef76f6e 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1215,10 +1215,30 @@ def to_pandas( self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) - def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: - """Stream DataFrame results to an iterable of pandas DataFrame""" + def to_pandas_batches( + self, page_size: Optional[int] = None, max_results: Optional[int] = None + ) -> Iterable[pandas.DataFrame]: + """Stream DataFrame results to an iterable of pandas DataFrame. + + page_size and max_results determine the size and number of batches, + see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result + + Args: + page_size (int, default None): + The size of each batch. + max_results (int, default None): + If given, only download this many rows at maximum. + + Returns: + Iterable[pandas.DataFrame]: + An iterable of smaller dataframes which combine to + form the original dataframe. Results stream from bigquery, + see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable + """ self._optimize_query_complexity() - return self._block.to_pandas_batches() + return self._block.to_pandas_batches( + page_size=page_size, max_results=max_results + ) def _compute_dry_run(self) -> bigquery.QueryJob: return self._block._compute_dry_run() diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index 93f1cb66d4..955cc5c2e2 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -75,11 +75,13 @@ def test_index_repr_large_table(): def test_to_pandas_batches_large_table(): - df = bpd.read_gbq("load_testing.scalars_100gb") + df = bpd.read_gbq("load_testing.scalars_1tb") _, expected_column_count = df.shape # download only a few batches, since 1tb would be too much - iterator = iter(df.to_pandas_batches()) + iterator = iter(df.to_pandas_batches(page_size=500)) + # use page size since client library doesn't support + # streaming only part of the dataframe via bqstorage for _ in range(3): pdf = next(iterator) batch_row_count, batch_column_count = pdf.shape From b0963272129dd4591a5d3e25faac125392e64b58 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 6 Jun 2024 16:08:03 +0000 Subject: [PATCH 5/6] still pass storage client --- bigframes/core/blocks.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 3808b61fe4..0e0bd30be3 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -527,9 +527,7 @@ def to_pandas_batches( page_size=page_size, max_results=max_results ) for arrow_table in results_iterator.to_arrow_iterable( - # we can't pass bqstorage_client=self.session.bqstoragereadclient - # because large results will take too long to be downloaded to the - # storage and won't be streamed + bqstorage_client=self.session.bqstoragereadclient ): df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes) self._copy_index_to_pandas(df) From c4c380bf8e2e776addeff94b154c16f400faf8b0 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 6 Jun 2024 17:12:40 +0000 Subject: [PATCH 6/6] use max_results instead of iterator trick --- tests/system/load/test_large_tables.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index 955cc5c2e2..f92207b191 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -79,11 +79,10 @@ def test_to_pandas_batches_large_table(): _, expected_column_count = df.shape # download only a few batches, since 1tb would be too much - iterator = iter(df.to_pandas_batches(page_size=500)) + iterable = df.to_pandas_batches(page_size=500, max_results=1500) # use page size since client library doesn't support # streaming only part of the dataframe via bqstorage - for _ in range(3): - pdf = next(iterator) + for pdf in iterable: batch_row_count, batch_column_count = pdf.shape assert batch_column_count == expected_column_count assert batch_row_count > 0