Skip to content

Commit 67487b9

Browse files
Genesis929tswast
andauthored
feat: add allow_large_results to peek (#1448)
* feat: add allow_large_results to peek * mypy fix * update sampling logic * update sampling logic * update default value * update annotation * type fixes * update assert * update metrics, tests and warnings * Apply suggestions from code review --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent 0070e77 commit 67487b9

File tree

16 files changed

+196
-59
lines changed

16 files changed

+196
-59
lines changed

bigframes/core/blocks.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -559,10 +559,12 @@ def to_pandas(
559559
return df, query_job
560560

561561
def try_peek(
562-
self, n: int = 20, force: bool = False
562+
self, n: int = 20, force: bool = False, allow_large_results=None
563563
) -> typing.Optional[pd.DataFrame]:
564564
if force or self.expr.supports_fast_peek:
565-
result = self.session._executor.peek(self.expr, n)
565+
result = self.session._executor.peek(
566+
self.expr, n, use_explicit_destination=allow_large_results
567+
)
566568
df = io_pandas.arrow_to_pandas(result.to_arrow_table(), self.expr.schema)
567569
self._copy_index_to_pandas(df)
568570
return df
@@ -614,17 +616,27 @@ def _materialize_local(
614616
self.expr,
615617
ordered=materialize_options.ordered,
616618
use_explicit_destination=materialize_options.allow_large_results,
617-
get_size_bytes=True,
618619
)
619-
assert execute_result.total_bytes is not None
620-
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
621620
sample_config = materialize_options.downsampling
622-
max_download_size = sample_config.max_download_size
623-
fraction = (
624-
max_download_size / table_mb
625-
if (max_download_size is not None) and (table_mb != 0)
626-
else 2
627-
)
621+
if execute_result.total_bytes is not None:
622+
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
623+
max_download_size = sample_config.max_download_size
624+
fraction = (
625+
max_download_size / table_mb
626+
if (max_download_size is not None) and (table_mb != 0)
627+
else 2
628+
)
629+
else:
630+
# Since we cannot acquire the table size without a query_job,
631+
# we skip the sampling.
632+
if sample_config.enable_downsampling:
633+
warnings.warn(
634+
"Sampling is disabled and there is no download size limit when 'allow_large_results' is set to "
635+
"False. To prevent downloading excessive data, it is recommended to use the peek() method, or "
636+
"limit the data with methods like .head() or .sample() before proceeding with downloads.",
637+
UserWarning,
638+
)
639+
fraction = 2
628640

629641
# TODO: Maybe materialize before downsampling
630642
# Some downsampling methods

bigframes/core/indexes/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,8 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Ind
505505
df, query_job = self._block.index.to_pandas(
506506
ordered=True, allow_large_results=allow_large_results
507507
)
508-
self._query_job = query_job
508+
if query_job:
509+
self._query_job = query_job
509510
return df
510511

511512
def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray:

bigframes/dataframe.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,7 +1587,8 @@ def to_arrow(
15871587
pa_table, query_job = self._block.to_arrow(
15881588
ordered=ordered, allow_large_results=allow_large_results
15891589
)
1590-
self._set_internal_query_job(query_job)
1590+
if query_job:
1591+
self._set_internal_query_job(query_job)
15911592
return pa_table
15921593

15931594
def to_pandas(
@@ -1637,7 +1638,8 @@ def to_pandas(
16371638
ordered=ordered,
16381639
allow_large_results=allow_large_results,
16391640
)
1640-
self._set_internal_query_job(query_job)
1641+
if query_job:
1642+
self._set_internal_query_job(query_job)
16411643
return df.set_axis(self._block.column_labels, axis=1, copy=False)
16421644

16431645
def to_pandas_batches(
@@ -1687,7 +1689,9 @@ def head(self, n: int = 5) -> DataFrame:
16871689
def tail(self, n: int = 5) -> DataFrame:
16881690
return typing.cast(DataFrame, self.iloc[-n:])
16891691

1690-
def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
1692+
def peek(
1693+
self, n: int = 5, *, force: bool = True, allow_large_results=None
1694+
) -> pandas.DataFrame:
16911695
"""
16921696
Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering.
16931697
``DataFrame.peek(force=False)`` will always be very fast, but will not succeed if data requires
@@ -1700,17 +1704,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
17001704
force (bool, default True):
17011705
If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part
17021706
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
1707+
allow_large_results (bool, default None):
1708+
If not None, overrides the global setting to allow or disallow large query results
1709+
over the default size limit of 10 GB.
17031710
Returns:
17041711
pandas.DataFrame: A pandas DataFrame with n rows.
17051712
17061713
Raises:
17071714
ValueError: If force=False and data cannot be efficiently peeked.
17081715
"""
1709-
maybe_result = self._block.try_peek(n)
1716+
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
17101717
if maybe_result is None:
17111718
if force:
17121719
self._cached()
1713-
maybe_result = self._block.try_peek(n, force=True)
1720+
maybe_result = self._block.try_peek(
1721+
n, force=True, allow_large_results=allow_large_results
1722+
)
17141723
assert maybe_result is not None
17151724
else:
17161725
raise ValueError(

bigframes/functions/_function_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
125125
create_function_ddl,
126126
job_config=bigquery.QueryJobConfig(),
127127
)
128+
assert query_job is not None
128129
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
129130

130131
def _format_function_options(self, function_options: dict) -> str:

bigframes/series.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,8 @@ def to_pandas(
420420
ordered=ordered,
421421
allow_large_results=allow_large_results,
422422
)
423-
self._set_internal_query_job(query_job)
423+
if query_job:
424+
self._set_internal_query_job(query_job)
424425
series = df.squeeze(axis=1)
425426
series.name = self._name
426427
return series
@@ -690,7 +691,9 @@ def head(self, n: int = 5) -> Series:
690691
def tail(self, n: int = 5) -> Series:
691692
return typing.cast(Series, self.iloc[-n:])
692693

693-
def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
694+
def peek(
695+
self, n: int = 5, *, force: bool = True, allow_large_results=None
696+
) -> pandas.Series:
694697
"""
695698
Preview n arbitrary elements from the series without guarantees about row selection or ordering.
696699
@@ -704,17 +707,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
704707
force (bool, default True):
705708
If the data cannot be peeked efficiently, the series will instead be fully materialized as part
706709
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
710+
allow_large_results (bool, default None):
711+
If not None, overrides the global setting to allow or disallow large query results
712+
over the default size limit of 10 GB.
707713
Returns:
708714
pandas.Series: A pandas Series with n rows.
709715
710716
Raises:
711717
ValueError: If force=False and data cannot be efficiently peeked.
712718
"""
713-
maybe_result = self._block.try_peek(n)
719+
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
714720
if maybe_result is None:
715721
if force:
716722
self._cached()
717-
maybe_result = self._block.try_peek(n, force=True)
723+
maybe_result = self._block.try_peek(
724+
n, force=True, allow_large_results=allow_large_results
725+
)
718726
assert maybe_result is not None
719727
else:
720728
raise ValueError(

bigframes/session/__init__.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,25 @@ def _project(self):
344344
@property
345345
def bytes_processed_sum(self):
346346
"""The sum of all bytes processed by bigquery jobs using this session."""
347+
warnings.warn(
348+
"Queries executed with `allow_large_results=False` within the session will not "
349+
"have their bytes processed counted in this sum. If you need precise "
350+
"bytes processed information, query the `INFORMATION_SCHEMA` tables "
351+
"to get relevant metrics.",
352+
UserWarning,
353+
)
347354
return self._metrics.bytes_processed
348355

349356
@property
350357
def slot_millis_sum(self):
351358
"""The sum of all slot time used by bigquery jobs in this session."""
359+
warnings.warn(
360+
"Queries executed with `allow_large_results=False` within the session will not "
361+
"have their slot milliseconds counted in this sum. If you need precise slot "
362+
"milliseconds information, query the `INFORMATION_SCHEMA` tables "
363+
"to get relevant metrics.",
364+
UserWarning,
365+
)
352366
return self._metrics.slot_millis
353367

354368
@property
@@ -1675,11 +1689,13 @@ def _start_query_ml_ddl(
16751689
# so we must reset any encryption set in the job config
16761690
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
16771691
job_config.destination_encryption_configuration = None
1678-
1679-
return bf_io_bigquery.start_query_with_client(
1692+
iterator, query_job = bf_io_bigquery.start_query_with_client(
16801693
self.bqclient, sql, job_config=job_config, metrics=self._metrics
16811694
)
16821695

1696+
assert query_job is not None
1697+
return iterator, query_job
1698+
16831699
def _create_object_table(self, path: str, connection: str) -> str:
16841700
"""Create a random id Object Table from the input path and connection."""
16851701
table = str(self._loader._storage_manager._random_table())

bigframes/session/_io/bigquery/__init__.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,28 @@ def start_query_with_client(
228228
timeout: Optional[float] = None,
229229
api_name: Optional[str] = None,
230230
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
231-
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
231+
*,
232+
query_with_job: bool = True,
233+
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
232234
"""
233235
Starts query job and waits for results.
234236
"""
235237
try:
236238
# Note: Ensure no additional labels are added to job_config after this point,
237239
# as `add_and_trim_labels` ensures the label count does not exceed 64.
238240
add_and_trim_labels(job_config, api_name=api_name)
241+
if not query_with_job:
242+
results_iterator = bq_client.query_and_wait(
243+
sql,
244+
job_config=job_config,
245+
location=location,
246+
project=project,
247+
api_timeout=timeout,
248+
)
249+
if metrics is not None:
250+
metrics.count_job_stats()
251+
return results_iterator, None
252+
239253
query_job = bq_client.query(
240254
sql,
241255
job_config=job_config,
@@ -338,6 +352,7 @@ def create_bq_dataset_reference(
338352
# to the dataset, no BigQuery Session required. Note: there is a
339353
# different anonymous dataset per location. See:
340354
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
355+
assert query_job is not None
341356
query_destination = query_job.destination
342357
return bigquery.DatasetReference(
343358
query_destination.project,

bigframes/session/executor.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def execute(
105105
*,
106106
ordered: bool = True,
107107
use_explicit_destination: Optional[bool] = False,
108-
get_size_bytes: bool = False,
109108
page_size: Optional[int] = None,
110109
max_results: Optional[int] = None,
111110
):
@@ -152,6 +151,7 @@ def peek(
152151
self,
153152
array_value: bigframes.core.ArrayValue,
154153
n_rows: int,
154+
use_explicit_destination: Optional[bool] = False,
155155
) -> ExecuteResult:
156156
"""
157157
A 'peek' efficiently accesses a small number of rows in the dataframe.
@@ -233,8 +233,7 @@ def execute(
233233
array_value: bigframes.core.ArrayValue,
234234
*,
235235
ordered: bool = True,
236-
use_explicit_destination: Optional[bool] = False,
237-
get_size_bytes: bool = False,
236+
use_explicit_destination: Optional[bool] = None,
238237
page_size: Optional[int] = None,
239238
max_results: Optional[int] = None,
240239
):
@@ -259,13 +258,14 @@ def execute(
259258
job_config=job_config,
260259
page_size=page_size,
261260
max_results=max_results,
261+
query_with_job=use_explicit_destination,
262262
)
263263

264264
# Though we provide the read client, iterator may or may not use it based on what is efficient for the result
265265
def iterator_supplier():
266266
return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient)
267267

268-
if get_size_bytes is True or use_explicit_destination:
268+
if query_job:
269269
size_bytes = self.bqclient.get_table(query_job.destination).num_bytes
270270
else:
271271
size_bytes = None
@@ -329,8 +329,7 @@ def export_gbq(
329329
if if_exists != "append" and has_timedelta_col:
330330
# Only update schema if this is not modifying an existing table, and the
331331
# new table contains timedelta columns.
332-
assert query_job.destination is not None
333-
table = self.bqclient.get_table(query_job.destination)
332+
table = self.bqclient.get_table(destination)
334333
table.schema = array_value.schema.to_bigquery()
335334
self.bqclient.update_table(table, ["schema"])
336335

@@ -377,6 +376,7 @@ def peek(
377376
self,
378377
array_value: bigframes.core.ArrayValue,
379378
n_rows: int,
379+
use_explicit_destination: Optional[bool] = None,
380380
) -> ExecuteResult:
381381
"""
382382
A 'peek' efficiently accesses a small number of rows in the dataframe.
@@ -385,12 +385,24 @@ def peek(
385385
if not tree_properties.can_fast_peek(plan):
386386
msg = "Peeking this value cannot be done efficiently."
387387
warnings.warn(msg)
388+
if use_explicit_destination is None:
389+
use_explicit_destination = bigframes.options.bigquery.allow_large_results
390+
391+
job_config = bigquery.QueryJobConfig()
392+
# Use explicit destination to avoid 10GB limit of temporary table
393+
if use_explicit_destination:
394+
destination_table = self.storage_manager.create_temp_table(
395+
array_value.schema.to_bigquery(), cluster_cols=[]
396+
)
397+
job_config.destination = destination_table
388398

389399
sql = self.compiler.compile(plan, ordered=False, limit=n_rows)
390400

391401
# TODO(swast): plumb through the api_name of the user-facing api that
392402
# caused this query.
393-
iterator, query_job = self._run_execute_query(sql=sql)
403+
iterator, query_job = self._run_execute_query(
404+
sql=sql, job_config=job_config, query_with_job=use_explicit_destination
405+
)
394406
return ExecuteResult(
395407
# Probably don't need read client for small peek results, but let client decide
396408
arrow_batches=lambda: iterator.to_arrow_iterable(
@@ -485,7 +497,8 @@ def _run_execute_query(
485497
api_name: Optional[str] = None,
486498
page_size: Optional[int] = None,
487499
max_results: Optional[int] = None,
488-
) -> Tuple[bq_table.RowIterator, bigquery.QueryJob]:
500+
query_with_job: bool = True,
501+
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
489502
"""
490503
Starts BigQuery query job and waits for results.
491504
"""
@@ -503,15 +516,17 @@ def _run_execute_query(
503516
# as `add_and_trim_labels` ensures the label count does not exceed 64.
504517
bq_io.add_and_trim_labels(job_config, api_name=api_name)
505518
try:
506-
return bq_io.start_query_with_client(
519+
iterator, query_job = bq_io.start_query_with_client(
507520
self.bqclient,
508521
sql,
509522
job_config=job_config,
510523
api_name=api_name,
511524
max_results=max_results,
512525
page_size=page_size,
513526
metrics=self.metrics,
527+
query_with_job=query_with_job,
514528
)
529+
return iterator, query_job
515530

516531
except google.api_core.exceptions.BadRequest as e:
517532
# Unfortunately, this error type does not have a separate error code or exception type
@@ -642,7 +657,7 @@ def _sql_as_cached_temp_table(
642657
job_config=job_config,
643658
api_name="cached",
644659
)
645-
query_job.destination
660+
assert query_job is not None
646661
query_job.result()
647662
return query_job.destination
648663

bigframes/session/loader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,14 +726,16 @@ def _start_query(
726726
job_config.maximum_bytes_billed = (
727727
bigframes.options.compute.maximum_bytes_billed
728728
)
729-
return bf_io_bigquery.start_query_with_client(
729+
iterator, query_job = bf_io_bigquery.start_query_with_client(
730730
self._bqclient,
731731
sql,
732732
job_config=job_config,
733733
max_results=max_results,
734734
timeout=timeout,
735735
api_name=api_name,
736736
)
737+
assert query_job is not None
738+
return iterator, query_job
737739

738740

739741
def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:

0 commit comments

Comments
 (0)