Skip to content
34 changes: 23 additions & 11 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,12 @@ def to_pandas(
return df, query_job

def try_peek(
self, n: int = 20, force: bool = False
self, n: int = 20, force: bool = False, allow_large_results=None
) -> typing.Optional[pd.DataFrame]:
if force or self.expr.supports_fast_peek:
result = self.session._executor.peek(self.expr, n)
result = self.session._executor.peek(
self.expr, n, use_explicit_destination=allow_large_results
)
df = io_pandas.arrow_to_pandas(result.to_arrow_table(), self.expr.schema)
self._copy_index_to_pandas(df)
return df
Expand Down Expand Up @@ -614,17 +616,27 @@ def _materialize_local(
self.expr,
ordered=materialize_options.ordered,
use_explicit_destination=materialize_options.allow_large_results,
get_size_bytes=True,
)
assert execute_result.total_bytes is not None
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
sample_config = materialize_options.downsampling
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
if (max_download_size is not None) and (table_mb != 0)
else 2
)
if execute_result.total_bytes is not None:
table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES
max_download_size = sample_config.max_download_size
fraction = (
max_download_size / table_mb
if (max_download_size is not None) and (table_mb != 0)
else 2
)
else:
# Since we cannot acquire the table size without a query_job,
# we skip the sampling.
if sample_config.enable_downsampling:
warnings.warn(
"Sampling is disabled and there is no download size limit when 'allow_large_results' is set to "
"False. To prevent downloading excessive data, it is recommended to use the peek() method, or "
"limit the data with methods like .head() or .sample() before proceeding with downloads.",
UserWarning,
)
fraction = 2

# TODO: Maybe materialize before downsampling
# Some downsampling methods
Expand Down
3 changes: 2 additions & 1 deletion bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Ind
df, query_job = self._block.index.to_pandas(
ordered=True, allow_large_results=allow_large_results
)
self._query_job = query_job
if query_job:
self._query_job = query_job
return df

def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray:
Expand Down
19 changes: 14 additions & 5 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,8 @@ def to_arrow(
pa_table, query_job = self._block.to_arrow(
ordered=ordered, allow_large_results=allow_large_results
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
return pa_table

def to_pandas(
Expand Down Expand Up @@ -1637,7 +1638,8 @@ def to_pandas(
ordered=ordered,
allow_large_results=allow_large_results,
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
return df.set_axis(self._block.column_labels, axis=1, copy=False)

def to_pandas_batches(
Expand Down Expand Up @@ -1687,7 +1689,9 @@ def head(self, n: int = 5) -> DataFrame:
def tail(self, n: int = 5) -> DataFrame:
return typing.cast(DataFrame, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
def peek(
self, n: int = 5, *, force: bool = True, allow_large_results=None
) -> pandas.DataFrame:
"""
Preview n arbitrary rows from the dataframe. No guarantees about row selection or ordering.
``DataFrame.peek(force=False)`` will always be very fast, but will not succeed if data requires
Expand All @@ -1700,17 +1704,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame:
force (bool, default True):
If the data cannot be peeked efficiently, the dataframe will instead be fully materialized as part
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.DataFrame: A pandas DataFrame with n rows.

Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n, force=True)
maybe_result = self._block.try_peek(
n, force=True, allow_large_results=allow_large_results
)
assert maybe_result is not None
else:
raise ValueError(
Expand Down
1 change: 1 addition & 0 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)
assert query_job is not None
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def _format_function_options(self, function_options: dict) -> str:
Expand Down
16 changes: 12 additions & 4 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ def to_pandas(
ordered=ordered,
allow_large_results=allow_large_results,
)
self._set_internal_query_job(query_job)
if query_job:
self._set_internal_query_job(query_job)
series = df.squeeze(axis=1)
series.name = self._name
return series
Expand Down Expand Up @@ -690,7 +691,9 @@ def head(self, n: int = 5) -> Series:
def tail(self, n: int = 5) -> Series:
return typing.cast(Series, self.iloc[-n:])

def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
def peek(
self, n: int = 5, *, force: bool = True, allow_large_results=None
) -> pandas.Series:
"""
Preview n arbitrary elements from the series without guarantees about row selection or ordering.

Expand All @@ -704,17 +707,22 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.Series:
force (bool, default True):
If the data cannot be peeked efficiently, the series will instead be fully materialized as part
of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError.
allow_large_results (bool, default None):
If not None, overrides the global setting to allow or disallow large query results
over the default size limit of 10 GB.
Returns:
pandas.Series: A pandas Series with n rows.

Raises:
ValueError: If force=False and data cannot be efficiently peeked.
"""
maybe_result = self._block.try_peek(n)
maybe_result = self._block.try_peek(n, allow_large_results=allow_large_results)
if maybe_result is None:
if force:
self._cached()
maybe_result = self._block.try_peek(n, force=True)
maybe_result = self._block.try_peek(
n, force=True, allow_large_results=allow_large_results
)
assert maybe_result is not None
else:
raise ValueError(
Expand Down
20 changes: 18 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,25 @@ def _project(self):
@property
def bytes_processed_sum(self):
"""The sum of all bytes processed by bigquery jobs using this session."""
warnings.warn(
"Queries executed with `allow_large_results=False` within the session will not "
"have their bytes processed counted in this sum. If you need precise "
"bytes processed information, query the `INFORMATION_SCHEMA` tables "
"to get relevant metrics.",
UserWarning,
)
return self._metrics.bytes_processed

@property
def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
warnings.warn(
"Queries executed with `allow_large_results=False` within the session will not "
"have their slot milliseconds counted in this sum. If you need precise slot "
"milliseconds information, query the `INFORMATION_SCHEMA` tables "
"to get relevant metrics.",
UserWarning,
)
return self._metrics.slot_millis

@property
Expand Down Expand Up @@ -1675,11 +1689,13 @@ def _start_query_ml_ddl(
# so we must reset any encryption set in the job config
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
job_config.destination_encryption_configuration = None

return bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_client(
self.bqclient, sql, job_config=job_config, metrics=self._metrics
)

assert query_job is not None
return iterator, query_job

def _create_object_table(self, path: str, connection: str) -> str:
"""Create a random id Object Table from the input path and connection."""
table = str(self._loader._storage_manager._random_table())
Expand Down
17 changes: 16 additions & 1 deletion bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,28 @@ def start_query_with_client(
timeout: Optional[float] = None,
api_name: Optional[str] = None,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
*,
query_with_job: bool = True,
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts query job and waits for results.
"""
try:
# Note: Ensure no additional labels are added to job_config after this point,
# as `add_and_trim_labels` ensures the label count does not exceed 64.
add_and_trim_labels(job_config, api_name=api_name)
if not query_with_job:
results_iterator = bq_client.query_and_wait(
sql,
job_config=job_config,
location=location,
project=project,
api_timeout=timeout,
)
if metrics is not None:
metrics.count_job_stats()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query response includes totalBytesProcessed. Let's create a follow-up issue to include those metrics, too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed internal issue 400961399

return results_iterator, None

query_job = bq_client.query(
sql,
job_config=job_config,
Expand Down Expand Up @@ -338,6 +352,7 @@ def create_bq_dataset_reference(
# to the dataset, no BigQuery Session required. Note: there is a
# different anonymous dataset per location. See:
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
assert query_job is not None
query_destination = query_job.destination
return bigquery.DatasetReference(
query_destination.project,
Expand Down
35 changes: 25 additions & 10 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def execute(
*,
ordered: bool = True,
use_explicit_destination: Optional[bool] = False,
get_size_bytes: bool = False,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
):
Expand Down Expand Up @@ -152,6 +151,7 @@ def peek(
self,
array_value: bigframes.core.ArrayValue,
n_rows: int,
use_explicit_destination: Optional[bool] = False,
) -> ExecuteResult:
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
Expand Down Expand Up @@ -233,8 +233,7 @@ def execute(
array_value: bigframes.core.ArrayValue,
*,
ordered: bool = True,
use_explicit_destination: Optional[bool] = False,
get_size_bytes: bool = False,
use_explicit_destination: Optional[bool] = None,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
):
Expand All @@ -259,13 +258,14 @@ def execute(
job_config=job_config,
page_size=page_size,
max_results=max_results,
query_with_job=use_explicit_destination,
)

# Though we provide the read client, iterator may or may not use it based on what is efficient for the result
def iterator_supplier():
return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient)

if get_size_bytes is True or use_explicit_destination:
if query_job:
size_bytes = self.bqclient.get_table(query_job.destination).num_bytes
else:
size_bytes = None
Expand Down Expand Up @@ -329,8 +329,7 @@ def export_gbq(
if if_exists != "append" and has_timedelta_col:
# Only update schema if this is not modifying an existing table, and the
# new table contains timedelta columns.
assert query_job.destination is not None
table = self.bqclient.get_table(query_job.destination)
table = self.bqclient.get_table(destination)
table.schema = array_value.schema.to_bigquery()
self.bqclient.update_table(table, ["schema"])

Expand Down Expand Up @@ -377,6 +376,7 @@ def peek(
self,
array_value: bigframes.core.ArrayValue,
n_rows: int,
use_explicit_destination: Optional[bool] = None,
) -> ExecuteResult:
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
Expand All @@ -385,12 +385,24 @@ def peek(
if not tree_properties.can_fast_peek(plan):
msg = "Peeking this value cannot be done efficiently."
warnings.warn(msg)
if use_explicit_destination is None:
use_explicit_destination = bigframes.options.bigquery.allow_large_results

job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.create_temp_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be a full table, right? We should be able to avoid the tables.create call. _random_table seems more appropriate (

def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the main reason is we need to set an expiration time for temp table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point.

array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table

sql = self.compiler.compile(plan, ordered=False, limit=n_rows)

# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, query_job = self._run_execute_query(sql=sql)
iterator, query_job = self._run_execute_query(
sql=sql, job_config=job_config, query_with_job=use_explicit_destination
)
return ExecuteResult(
# Probably don't need read client for small peek results, but let client decide
arrow_batches=lambda: iterator.to_arrow_iterable(
Expand Down Expand Up @@ -485,7 +497,8 @@ def _run_execute_query(
api_name: Optional[str] = None,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> Tuple[bq_table.RowIterator, bigquery.QueryJob]:
query_with_job: bool = True,
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
"""
Expand All @@ -503,15 +516,17 @@ def _run_execute_query(
# as `add_and_trim_labels` ensures the label count does not exceed 64.
bq_io.add_and_trim_labels(job_config, api_name=api_name)
try:
return bq_io.start_query_with_client(
iterator, query_job = bq_io.start_query_with_client(
self.bqclient,
sql,
job_config=job_config,
api_name=api_name,
max_results=max_results,
page_size=page_size,
metrics=self.metrics,
query_with_job=query_with_job,
)
return iterator, query_job

except google.api_core.exceptions.BadRequest as e:
# Unfortunately, this error type does not have a separate error code or exception type
Expand Down Expand Up @@ -642,7 +657,7 @@ def _sql_as_cached_temp_table(
job_config=job_config,
api_name="cached",
)
query_job.destination
assert query_job is not None
query_job.result()
return query_job.destination

Expand Down
4 changes: 3 additions & 1 deletion bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,14 +726,16 @@ def _start_query(
job_config.maximum_bytes_billed = (
bigframes.options.compute.maximum_bytes_billed
)
return bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_client(
self._bqclient,
sql,
job_config=job_config,
max_results=max_results,
timeout=timeout,
api_name=api_name,
)
assert query_job is not None
return iterator, query_job


def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
Expand Down
Loading