Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import textwrap
import typing
from typing import (
Callable,
Iterable,
Iterator,
List,
Expand Down Expand Up @@ -679,6 +680,7 @@ def to_pandas_batches(
page_size: Optional[int] = None,
max_results: Optional[int] = None,
allow_large_results: Optional[bool] = None,
callback: Callable = lambda _: None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion offline, we may want to provide a global progress publisher object rather than add callbacks to each method that needs them.

) -> Iterator[pd.DataFrame]:
"""Download results one message at a time.

Expand All @@ -696,6 +698,7 @@ def to_pandas_batches(
promise_under_10gb=under_10gb,
ordered=True,
),
callback=callback,
)

# To reduce the number of edge cases to consider when working with the
Expand Down
15 changes: 15 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,21 @@ def to_pandas_batches(
allow_large_results=allow_large_results,
)

def _to_pandas_batches_colab(
self,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
*,
allow_large_results: Optional[bool] = None,
callback: Callable = lambda _: None,
) -> Iterable[pandas.DataFrame]:
return self._block.to_pandas_batches(
page_size=page_size,
max_results=max_results,
allow_large_results=allow_large_results,
callback=callback,
)

def _compute_dry_run(self) -> bigquery.QueryJob:
_, query_job = self._block._compute_dry_run()
return query_job
Expand Down
12 changes: 10 additions & 2 deletions bigframes/display/anywidget.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import pandas as pd

import bigframes
import bigframes.dataframe
import bigframes.display.html

# anywidget and traitlets are optional dependencies. We don't want the import of this
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame):
initial_page_size = bigframes.options.display.max_rows

# Initialize data fetching attributes.
self._batches = dataframe.to_pandas_batches(page_size=initial_page_size)
# self._batches = dataframe._to_pandas_batches_colab(page_size=initial_page_size, callback=self._update_progress)

# set traitlets properties that trigger observers
self.page_size = initial_page_size
Expand All @@ -100,6 +101,7 @@ def _css(self):
page = traitlets.Int(0).tag(sync=True)
page_size = traitlets.Int(25).tag(sync=True)
row_count = traitlets.Int(0).tag(sync=True)
progress_html = traitlets.Unicode().tag(sync=True)
table_html = traitlets.Unicode().tag(sync=True)

@traitlets.validate("page")
Expand Down Expand Up @@ -145,6 +147,10 @@ def _validate_page_size(self, proposal: Dict[str, Any]) -> int:
max_page_size = 1000
return min(value, max_page_size)

def _update_progress(self, event):
# TODO: use formatting helpers here.
self.progress_html = f"<code>{repr(event)}</code>"

def _get_next_batch(self) -> bool:
"""
Gets the next batch of data from the generator and appends to cache.
Expand Down Expand Up @@ -180,7 +186,9 @@ def _cached_data(self) -> pd.DataFrame:

def _reset_batches_for_new_page_size(self):
"""Reset the batch iterator when page size changes."""
self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size)
self._batches = self._dataframe._to_pandas_batches_colab(
page_size=self.page_size, callback=self._update_progress
)
self._cached_batches = []
self._batch_iter = None
self._all_data_loaded = False
Expand Down
13 changes: 13 additions & 0 deletions bigframes/display/table_widget.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ const ModelProperty = {
PAGE_SIZE: "page_size",
ROW_COUNT: "row_count",
TABLE_HTML: "table_html",
PROGRESS_HTML: "progress_html",
};

const Event = {
CHANGE: "change",
CHANGE_TABLE_HTML: `change:${ModelProperty.TABLE_HTML}`,
CHANGE_PROGRESS_HTML: `change:${ModelProperty.PROGRESS_HTML}`,
CLICK: "click",
};

Expand All @@ -39,6 +41,7 @@ function render({ model, el }) {
el.classList.add("bigframes-widget");

// Structure
const progressContainer = document.createElement("div");
const tableContainer = document.createElement("div");
const footer = document.createElement("div");

Expand All @@ -57,6 +60,7 @@ function render({ model, el }) {
const pageSizeSelect = document.createElement("select");

// Add CSS classes
progressContainer.classList.add("progress-container");
tableContainer.classList.add("table-container");
footer.classList.add("footer");
paginationContainer.classList.add("pagination");
Expand Down Expand Up @@ -119,6 +123,13 @@ function render({ model, el }) {
}
}

/** Updates the HTML in the progress container. */
function handleProgressHTMLChange() {
// Note: Using innerHTML is safe here because the content is generated
// by a trusted backend (formatting_helpers).
progressContainer.innerHTML = model.get(ModelProperty.PROGRESS_HTML);
}

/** Updates the HTML in the table container and refreshes button states. */
function handleTableHTMLChange() {
// Note: Using innerHTML is safe here because the content is generated
Expand All @@ -137,6 +148,7 @@ function render({ model, el }) {
}
});
model.on(Event.CHANGE_TABLE_HTML, handleTableHTMLChange);
model.on(Event.CHANGE_PROGRESS_HTML, handleProgressHTMLChange);

// Assemble the DOM
paginationContainer.appendChild(prevPage);
Expand All @@ -150,6 +162,7 @@ function render({ model, el }) {
footer.appendChild(paginationContainer);
footer.appendChild(pageSizeContainer);

el.appendChild(progressContainer);
el.appendChild(tableContainer);
el.appendChild(footer);

Expand Down
116 changes: 84 additions & 32 deletions bigframes/formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
# limitations under the License.

"""Shared helper functions for formatting jobs related info."""
# TODO(orrbradford): cleanup up typings and documenttion in this file

from __future__ import annotations

import datetime
import random
from typing import Any, Optional, Type, Union
from typing import Any, Callable, Optional, Type, Union

import bigframes_vendored.constants as constants
import google.api_core.exceptions as api_core_exceptions
import google.cloud.bigquery as bigquery
import google.cloud.bigquery._job_helpers
import humanize
import IPython
import IPython.display as display
Expand All @@ -40,6 +42,45 @@
}


def create_progress_bar_callback(
*,
progress_bar: Optional[str] = None,
callback: Callable = lambda _: None,
) -> Callable:
if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"

if progress_bar == "notebook":
loading_bar = display.HTML("")
display_id = str(random.random())
display.display(loading_bar, display_id=display_id)

def outer_callback(event):
callback(event)
display.update_display(
display.HTML(get_query_job_loading_html(event)),
display_id=display_id,
)

elif progress_bar == "terminal":
previous_bar_text = ""

def outer_callback(event):
nonlocal previous_bar_text

callback(event)

bar_text = get_query_job_loading_string(event)
if bar_text != previous_bar_text:
print(bar_text)
previous_bar_text = bar_text

else:
outer_callback = callback

return outer_callback


def add_feedback_link(
exception: Union[
api_core_exceptions.RetryError, api_core_exceptions.GoogleAPICallError
Expand Down Expand Up @@ -123,7 +164,7 @@ def wait_for_query_job(
query_job: bigquery.QueryJob,
max_results: Optional[int] = None,
page_size: Optional[int] = None,
progress_bar: Optional[str] = None,
callback: Callable = lambda _: None,
) -> bigquery.table.RowIterator:
"""Return query results. Displays a progress bar while the query is running
Args:
Expand All @@ -138,46 +179,57 @@ def wait_for_query_job(
Returns:
A row iterator over the query results.
"""
if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"

try:
if progress_bar == "notebook":
display_id = str(random.random())
loading_bar = display.HTML(get_query_job_loading_html(query_job))
display.display(loading_bar, display_id=display_id)
query_result = query_job.result(
max_results=max_results, page_size=page_size
callback(
# DONOTSUBMIT: we should create our own events.
google.cloud.bigquery._job_helpers.QueryReceivedEvent(
billing_project=query_job.project,
location=query_job.location,
job_id=query_job.job_id,
statement_type=query_job.statement_type,
state=query_job.state,
query_plan=query_job.query_plan,
created=query_job.created,
started=query_job.started,
ended=query_job.ended,
)
query_job.reload()
display.update_display(
display.HTML(get_query_job_loading_html(query_job)),
display_id=display_id,
)
elif progress_bar == "terminal":
initial_loading_bar = get_query_job_loading_string(query_job)
print(initial_loading_bar)
query_result = query_job.result(
max_results=max_results, page_size=page_size
)
query_job.reload()
if initial_loading_bar != get_query_job_loading_string(query_job):
print(get_query_job_loading_string(query_job))
else:
# No progress bar.
query_result = query_job.result(
max_results=max_results, page_size=page_size
)
# TODO(tswast): Add a timeout so that progress bars can make updates as
# the query stats come int.
# TODO(tswast): Listen for cancellation on the callback (or maybe
# callbacks should just raise KeyboardInterrupt like IPython does?).
query_results = query_job.result(
page_size=page_size,
max_results=max_results,
)
callback(
# DONOTSUBMIT: we should create our own events.
google.cloud.bigquery._job_helpers.QueryFinishedEvent(
billing_project=query_job.project,
location=query_results.location,
query_id=query_results.query_id,
job_id=query_results.job_id,
total_rows=query_results.total_rows,
total_bytes_processed=query_results.total_bytes_processed,
slot_millis=query_results.slot_millis,
destination=query_job.destination,
created=query_job.created,
started=query_job.started,
ended=query_job.ended,
)
query_job.reload()
return query_result
)
return query_results
except api_core_exceptions.RetryError as exc:
# TODO: turn this into a callback event, too.
add_feedback_link(exc)
raise
except api_core_exceptions.GoogleAPICallError as exc:
# TODO: turn this into a callback event, too.
add_feedback_link(exc)
raise
except KeyboardInterrupt:
query_job.cancel()
# TODO: turn this into a callback event, too.
print(
f"Requested cancellation for {query_job.job_type.capitalize()}"
f" job {query_job.job_id} in location {query_job.location}..."
Expand Down
7 changes: 6 additions & 1 deletion bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ def _try_read_gbq_colab_sessionless_dry_run(
with _default_location_lock:
if not config.options.bigquery._session_started:
return _run_read_gbq_colab_sessionless_dry_run(
query, pyformat_args=pyformat_args
query,
pyformat_args=pyformat_args,
)

# Explicitly return None to indicate that we didn't run the dry run query.
Expand Down Expand Up @@ -305,6 +306,7 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
callback: Callable = lambda _: None,
) -> bigframes.dataframe.DataFrame | pandas.Series:
"""A Colab-specific version of read_gbq.

Expand All @@ -319,6 +321,8 @@ def _read_gbq_colab(
dry_run (bool):
If True, estimates the query results size without returning data.
The return will be a pandas Series with query metadata.
callback (Callable):
A callback function used by bigframes to report query progress.

Returns:
Union[bigframes.dataframe.DataFrame, pandas.Series]:
Expand Down Expand Up @@ -364,6 +368,7 @@ def _read_gbq_colab(
query_or_table,
pyformat_args=pyformat_args,
dry_run=dry_run,
callback=callback,
)


Expand Down
7 changes: 7 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def _read_gbq_colab(
*,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
callback: Callable = lambda _: None,
) -> Union[dataframe.DataFrame, pandas.Series]:
"""A version of read_gbq that has the necessary default values for use in colab integrations.

Expand All @@ -519,6 +520,11 @@ def _read_gbq_colab(
instead. Note: unlike read_gbq / read_gbq_query, even if set to
None, this function always assumes {var} refers to a variable
that is supposed to be supplied in this dictionary.
dry_run (bool):
If True, estimates the query results size without returning data.
The return will be a pandas Series with query metadata.
callback (Callable):
A callback function used by bigframes to report query progress.
"""
if pyformat_args is None:
pyformat_args = {}
Expand All @@ -538,6 +544,7 @@ def _read_gbq_colab(
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
callback=callback,
)

@overload
Expand Down
Loading
Loading