Skip to content

Commit e6f9292

Browse files
authored
Merge branch 'main' into main_chelsealin_jsonfunctions
2 parents 3f77f54 + 1f6442e commit e6f9292

File tree

28 files changed

+428
-146
lines changed

28 files changed

+428
-146
lines changed

bigframes/bigquery/_operations/sql.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import google.cloud.bigquery
2222

23+
import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir
2324
import bigframes.core.sql
2425
import bigframes.dataframe
2526
import bigframes.dtypes
@@ -72,16 +73,16 @@ def sql_scalar(
7273
# Another benefit of this is that if there is a syntax error in the SQL
7374
# template, then this will fail with an error earlier in the process,
7475
# aiding users in debugging.
75-
base_series = columns[0]
76-
literals = [
77-
bigframes.dtypes.bigframes_dtype_to_literal(column.dtype) for column in columns
76+
literals_sql = [
77+
sqlglot_ir._literal(None, column.dtype).sql(dialect="bigquery")
78+
for column in columns
7879
]
79-
literals_sql = [bigframes.core.sql.simple_literal(literal) for literal in literals]
80+
select_sql = sql_template.format(*literals_sql)
81+
dry_run_sql = f"SELECT {select_sql}"
8082

8183
# Use the executor directly, because we want the original column IDs, not
8284
# the user-friendly column names that block.to_sql_query() would produce.
83-
select_sql = sql_template.format(*literals_sql)
84-
dry_run_sql = f"SELECT {select_sql}"
85+
base_series = columns[0]
8586
bqclient = base_series._session.bqclient
8687
job = bqclient.query(
8788
dry_run_sql, job_config=google.cloud.bigquery.QueryJobConfig(dry_run=True)

bigframes/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,8 @@
128128
# BigQuery default is 10000, leave 100 for overhead
129129
MAX_COLUMNS = 9900
130130

131+
# BigQuery has 1 MB query size limit. Don't want to take up more than a few % of that inlining a table.
132+
# Also must assume that text encoding as literals is much less efficient than in-memory representation.
133+
MAX_INLINE_BYTES = 5000
134+
131135
SUGGEST_PEEK_PREVIEW = "Use .peek(n) to preview n arbitrary rows."

bigframes/core/array_value.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,17 @@ def from_table(
133133
ordering=ordering,
134134
n_rows=n_rows,
135135
)
136+
return cls.from_bq_data_source(source_def, scan_list, session)
137+
138+
@classmethod
139+
def from_bq_data_source(
140+
cls,
141+
source: nodes.BigqueryDataSource,
142+
scan_list: nodes.ScanList,
143+
session: Session,
144+
):
136145
node = nodes.ReadTableNode(
137-
source=source_def,
146+
source=source,
138147
scan_list=scan_list,
139148
table_session=session,
140149
)

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import typing
1919

2020
from google.cloud import bigquery
21+
import numpy as np
2122
import pyarrow as pa
2223
import sqlglot as sg
2324
import sqlglot.dialects.bigquery
@@ -213,7 +214,11 @@ def _literal(value: typing.Any, dtype: dtypes.Dtype) -> sge.Expression:
213214
elif dtype == dtypes.BYTES_DTYPE:
214215
return _cast(str(value), sqlglot_type)
215216
elif dtypes.is_time_like(dtype):
217+
if isinstance(value, np.generic):
218+
value = value.item()
216219
return _cast(sge.convert(value.isoformat()), sqlglot_type)
220+
elif dtype in (dtypes.NUMERIC_DTYPE, dtypes.BIGNUMERIC_DTYPE):
221+
return _cast(sge.convert(value), sqlglot_type)
217222
elif dtypes.is_geo_like(dtype):
218223
wkt = value if isinstance(value, str) else to_wkt(value)
219224
return sge.func("ST_GEOGFROMTEXT", sge.convert(wkt))
@@ -234,6 +239,8 @@ def _literal(value: typing.Any, dtype: dtypes.Dtype) -> sge.Expression:
234239
)
235240
return values if len(value) > 0 else _cast(values, sqlglot_type)
236241
else:
242+
if isinstance(value, np.generic):
243+
value = value.item()
237244
return sge.convert(value)
238245

239246

bigframes/core/indexers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ def __getitem__(self, key):
155155
# row key. We must choose one, so bias towards treating as multi-part row label
156156
if isinstance(key, tuple) and len(key) == 2:
157157
is_row_multi_index = self._dataframe.index.nlevels > 1
158-
is_first_item_tuple = isinstance(key[0], tuple)
159-
if not is_row_multi_index or is_first_item_tuple:
158+
is_first_item_list_or_tuple = isinstance(key[0], (tuple, list))
159+
if not is_row_multi_index or is_first_item_list_or_tuple:
160160
df = typing.cast(
161161
bigframes.dataframe.DataFrame,
162162
_loc_getitem_series_or_dataframe(self._dataframe, key[0]),

bigframes/core/nodes.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,9 @@ class ScanItem(typing.NamedTuple):
578578
def with_id(self, id: identifiers.ColumnId) -> ScanItem:
579579
return ScanItem(id, self.dtype, self.source_id)
580580

581+
def with_source_id(self, source_id: str) -> ScanItem:
582+
return ScanItem(self.id, self.dtype, source_id)
583+
581584

582585
@dataclasses.dataclass(frozen=True)
583586
class ScanList:
@@ -614,16 +617,31 @@ def project(
614617
result = ScanList((self.items[:1]))
615618
return result
616619

620+
def remap_source_ids(
621+
self,
622+
mapping: Mapping[str, str],
623+
) -> ScanList:
624+
items = tuple(
625+
item.with_source_id(mapping.get(item.source_id, item.source_id))
626+
for item in self.items
627+
)
628+
return ScanList(items)
629+
630+
def append(
631+
self, source_id: str, dtype: bigframes.dtypes.Dtype, id: identifiers.ColumnId
632+
) -> ScanList:
633+
return ScanList((*self.items, ScanItem(id, dtype, source_id)))
634+
617635

618636
@dataclasses.dataclass(frozen=True, eq=False)
619637
class ReadLocalNode(LeafNode):
620638
# TODO: Track nullability for local data
621639
local_data_source: local_data.ManagedArrowTable
622640
# Mapping of local ids to bfet id.
623641
scan_list: ScanList
642+
session: bigframes.session.Session
624643
# Offsets are generated only if this is non-null
625644
offsets_col: Optional[identifiers.ColumnId] = None
626-
session: typing.Optional[bigframes.session.Session] = None
627645

628646
@property
629647
def fields(self) -> Sequence[Field]:

bigframes/dtypes.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -499,33 +499,6 @@ def bigframes_dtype_to_arrow_dtype(
499499
)
500500

501501

502-
def bigframes_dtype_to_literal(
503-
bigframes_dtype: Dtype,
504-
) -> Any:
505-
"""Create a representative literal value for a bigframes dtype.
506-
507-
The inverse of infer_literal_type().
508-
"""
509-
if isinstance(bigframes_dtype, pd.ArrowDtype):
510-
arrow_type = bigframes_dtype.pyarrow_dtype
511-
return arrow_type_to_literal(arrow_type)
512-
513-
if isinstance(bigframes_dtype, pd.Float64Dtype):
514-
return 1.0
515-
if isinstance(bigframes_dtype, pd.Int64Dtype):
516-
return 1
517-
if isinstance(bigframes_dtype, pd.BooleanDtype):
518-
return True
519-
if isinstance(bigframes_dtype, pd.StringDtype):
520-
return "string"
521-
if isinstance(bigframes_dtype, gpd.array.GeometryDtype):
522-
return shapely.geometry.Point((0, 0))
523-
524-
raise TypeError(
525-
f"No literal conversion for {bigframes_dtype}. {constants.FEEDBACK_LINK}"
526-
)
527-
528-
529502
def arrow_type_to_literal(
530503
arrow_type: pa.DataType,
531504
) -> Any:

bigframes/session/__init__.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from bigframes import version
6161
import bigframes._config.bigquery_options as bigquery_options
6262
import bigframes.clients
63+
import bigframes.constants
6364
from bigframes.core import blocks, log_adapter
6465
import bigframes.core.pyformat
6566

@@ -248,13 +249,6 @@ def __init__(
248249
self._temp_storage_manager = (
249250
self._session_resource_manager or self._anon_dataset_manager
250251
)
251-
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
252-
bqclient=self._clients_provider.bqclient,
253-
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
254-
storage_manager=self._temp_storage_manager,
255-
strictly_ordered=self._strictly_ordered,
256-
metrics=self._metrics,
257-
)
258252
self._loader = bigframes.session.loader.GbqDataLoader(
259253
session=self,
260254
bqclient=self._clients_provider.bqclient,
@@ -265,6 +259,14 @@ def __init__(
265259
force_total_order=self._strictly_ordered,
266260
metrics=self._metrics,
267261
)
262+
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
263+
bqclient=self._clients_provider.bqclient,
264+
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
265+
loader=self._loader,
266+
storage_manager=self._temp_storage_manager,
267+
strictly_ordered=self._strictly_ordered,
268+
metrics=self._metrics,
269+
)
268270

269271
def __del__(self):
270272
"""Automatic cleanup of internal resources."""
@@ -937,15 +939,15 @@ def _read_pandas(
937939
if write_engine == "default":
938940
write_engine = (
939941
"bigquery_load"
940-
if mem_usage > MAX_INLINE_DF_BYTES
942+
if mem_usage > bigframes.constants.MAX_INLINE_BYTES
941943
else "bigquery_inline"
942944
)
943945

944946
if write_engine == "bigquery_inline":
945-
if mem_usage > MAX_INLINE_DF_BYTES:
947+
if mem_usage > bigframes.constants.MAX_INLINE_BYTES:
946948
raise ValueError(
947949
f"DataFrame size ({mem_usage} bytes) exceeds the maximum allowed "
948-
f"for inline data ({MAX_INLINE_DF_BYTES} bytes)."
950+
f"for inline data ({bigframes.constants.MAX_INLINE_BYTES} bytes)."
949951
)
950952
return self._read_pandas_inline(pandas_dataframe)
951953
elif write_engine == "bigquery_load":
@@ -954,6 +956,10 @@ def _read_pandas(
954956
return self._loader.read_pandas(pandas_dataframe, method="stream")
955957
elif write_engine == "bigquery_write":
956958
return self._loader.read_pandas(pandas_dataframe, method="write")
959+
elif write_engine == "_deferred":
960+
import bigframes.dataframe as dataframe
961+
962+
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self))
957963
else:
958964
raise ValueError(f"Got unexpected write_engine '{write_engine}'")
959965

0 commit comments

Comments
 (0)