Skip to content

Commit 74b1b65

Browse files
perf: Defer some data uploads to execution time
1 parent c46ad06 commit 74b1b65

File tree

2 files changed

+75
-40
lines changed

2 files changed

+75
-40
lines changed

bigframes/core/array_value.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,17 @@ def from_table(
133133
ordering=ordering,
134134
n_rows=n_rows,
135135
)
136+
return cls.from_bq_data_source(source_def, scan_list, session)
137+
138+
@classmethod
139+
def from_bq_data_source(
140+
cls,
141+
source: nodes.BigqueryDataSource,
142+
scan_list: nodes.ScanList,
143+
session: Session,
144+
):
136145
node = nodes.ReadTableNode(
137-
source=source_def,
146+
source=source,
138147
scan_list=scan_list,
139148
table_session=session,
140149
)

bigframes/session/loader.py

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import pandas
4444
import pyarrow as pa
4545

46-
from bigframes.core import guid, local_data, utils
46+
from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils
4747
import bigframes.core as core
4848
import bigframes.core.blocks as blocks
4949
import bigframes.core.schema as schemata
@@ -183,35 +183,59 @@ def read_pandas(
183183
)
184184
managed_data = local_data.ManagedArrowTable.from_pandas(prepared_df)
185185

186+
block = blocks.Block(
187+
self.read_managed_data(managed_data, method=method, api_name=api_name),
188+
index_columns=idx_cols,
189+
column_labels=pandas_dataframe.columns,
190+
index_labels=pandas_dataframe.index.names,
191+
)
192+
return dataframe.DataFrame(block)
193+
194+
def read_managed_data(
195+
self,
196+
data: local_data.ManagedArrowTable,
197+
method: Literal["load", "stream", "write"],
198+
api_name: str,
199+
) -> core.ArrayValue:
200+
offsets_col = guid.generate_guid("upload_offsets_")
186201
if method == "load":
187-
array_value = self.load_data(managed_data, api_name=api_name)
202+
gbq_source = self.load_data(
203+
data, offsets_col=offsets_col, api_name=api_name
204+
)
188205
elif method == "stream":
189-
array_value = self.stream_data(managed_data)
206+
gbq_source = self.stream_data(data, offsets_col=offsets_col)
190207
elif method == "write":
191-
array_value = self.write_data(managed_data)
208+
gbq_source = self.write_data(data, offsets_col=offsets_col)
192209
else:
193210
raise ValueError(f"Unsupported read method {method}")
194211

195-
block = blocks.Block(
196-
array_value,
197-
index_columns=idx_cols,
198-
column_labels=pandas_dataframe.columns,
199-
index_labels=pandas_dataframe.index.names,
212+
return core.ArrayValue.from_bq_data_source(
213+
source=gbq_source,
214+
scan_list=nodes.ScanList(
215+
tuple(
216+
nodes.ScanItem(
217+
identifiers.ColumnId(item.column), item.dtype, item.column
218+
)
219+
for item in data.schema.items
220+
)
221+
),
222+
session=self._session,
200223
)
201-
return dataframe.DataFrame(block)
202224

203225
def load_data(
204-
self, data: local_data.ManagedArrowTable, api_name: Optional[str] = None
205-
) -> core.ArrayValue:
226+
self,
227+
data: local_data.ManagedArrowTable,
228+
offsets_col: str,
229+
api_name: Optional[str] = None,
230+
) -> nodes.BigqueryDataSource:
206231
"""Load managed data into bigquery"""
207-
ordering_col = guid.generate_guid("load_offsets_")
208232

209233
# JSON support incomplete
210234
for item in data.schema.items:
211235
_validate_dtype_can_load(item.column, item.dtype)
212236

213237
schema_w_offsets = data.schema.append(
214-
schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE)
238+
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
215239
)
216240
bq_schema = schema_w_offsets.to_bigquery(_LOAD_JOB_TYPE_OVERRIDES)
217241

@@ -222,13 +246,13 @@ def load_data(
222246
job_config.labels = {"bigframes-api": api_name}
223247

224248
load_table_destination = self._storage_manager.create_temp_table(
225-
bq_schema, [ordering_col]
249+
bq_schema, [offsets_col]
226250
)
227251

228252
buffer = io.BytesIO()
229253
data.to_parquet(
230254
buffer,
231-
offsets_col=ordering_col,
255+
offsets_col=offsets_col,
232256
geo_format="wkt",
233257
duration_type="duration",
234258
json_type="string",
@@ -240,23 +264,24 @@ def load_data(
240264
self._start_generic_job(load_job)
241265
# must get table metadata after load job for accurate metadata
242266
destination_table = self._bqclient.get_table(load_table_destination)
243-
return core.ArrayValue.from_table(
244-
table=destination_table,
245-
schema=schema_w_offsets,
246-
session=self._session,
247-
offsets_col=ordering_col,
248-
n_rows=data.data.num_rows,
249-
).drop_columns([ordering_col])
267+
return nodes.BigqueryDataSource(
268+
destination_table,
269+
ordering=ordering.TotalOrdering.from_offset_col(offsets_col),
270+
n_rows=destination_table.num_rows,
271+
)
250272

251-
def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue:
273+
def stream_data(
274+
self,
275+
data: local_data.ManagedArrowTable,
276+
offsets_col: str,
277+
) -> nodes.BigqueryDataSource:
252278
"""Load managed data into bigquery"""
253-
ordering_col = guid.generate_guid("stream_offsets_")
254279
schema_w_offsets = data.schema.append(
255-
schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE)
280+
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
256281
)
257282
bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES)
258283
load_table_destination = self._storage_manager.create_temp_table(
259-
bq_schema, [ordering_col]
284+
bq_schema, [offsets_col]
260285
)
261286

262287
rows = data.itertuples(
@@ -279,20 +304,21 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue:
279304
table=destination_table,
280305
schema=schema_w_offsets,
281306
session=self._session,
282-
offsets_col=ordering_col,
307+
offsets_col=offsets_col,
283308
n_rows=data.data.num_rows,
284-
).drop_columns([ordering_col])
309+
).drop_columns([offsets_col])
285310

286-
def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue:
311+
def write_data(
312+
self,
313+
data: local_data.ManagedArrowTable,
314+
offsets_col: str,
315+
) -> nodes.BigqueryDataSource:
287316
"""Load managed data into bigquery"""
288-
ordering_col = guid.generate_guid("stream_offsets_")
289317
schema_w_offsets = data.schema.append(
290-
schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE)
318+
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
291319
)
292320
bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES)
293-
bq_table_ref = self._storage_manager.create_temp_table(
294-
bq_schema, [ordering_col]
295-
)
321+
bq_table_ref = self._storage_manager.create_temp_table(bq_schema, [offsets_col])
296322

297323
requested_stream = bq_storage_types.stream.WriteStream()
298324
requested_stream.type_ = bq_storage_types.stream.WriteStream.Type.COMMITTED # type: ignore
@@ -304,7 +330,7 @@ def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue:
304330

305331
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
306332
schema, batches = data.to_arrow(
307-
offsets_col=ordering_col, duration_type="int"
333+
offsets_col=offsets_col, duration_type="int"
308334
)
309335
offset = 0
310336
for batch in batches:
@@ -334,9 +360,9 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
334360
table=destination_table,
335361
schema=schema_w_offsets,
336362
session=self._session,
337-
offsets_col=ordering_col,
363+
offsets_col=offsets_col,
338364
n_rows=data.data.num_rows,
339-
).drop_columns([ordering_col])
365+
).drop_columns([offsets_col])
340366

341367
def _start_generic_job(self, job: formatting_helpers.GenericJob):
342368
if bigframes.options.display.progress_bar is not None:
@@ -533,7 +559,7 @@ def read_gbq_table(
533559
if not primary_key:
534560
array_value = array_value.order_by(
535561
[
536-
bigframes.core.ordering.OrderingExpression(
562+
ordering.OrderingExpression(
537563
bigframes.operations.RowKey().as_expr(
538564
*(id for id in array_value.column_ids)
539565
),

0 commit comments

Comments
 (0)