Skip to content

Commit 3e584f4

Browse files
authored
Merge branch 'main' into ashleyxu-vector-search
2 parents c36ba63 + 3d39221 commit 3e584f4

File tree

12 files changed

+2036
-402
lines changed

12 files changed

+2036
-402
lines changed

bigframes/_config/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,12 @@ def _init_bigquery_thread_local(self):
6161
@property
6262
def bigquery(self) -> bigquery_options.BigQueryOptions:
6363
"""Options to use with the BigQuery engine."""
64-
if self._local.bigquery_options is not None:
64+
if (
65+
bigquery_options := getattr(self._local, "bigquery_options", None)
66+
) is not None:
6567
# The only way we can get here is if someone called
6668
# _init_bigquery_thread_local.
67-
return self._local.bigquery_options
69+
return bigquery_options
6870

6971
return self._bigquery_options
7072

bigframes/core/blocks.py

Lines changed: 244 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def __init__(
124124
if len(index_columns) == 0:
125125
warnings.warn(
126126
"Creating object with Null Index. Null Index is a preview feature.",
127-
category=bigframes.exceptions.PreviewWarning,
127+
category=bigframes.exceptions.NullIndexPreviewWarning,
128128
)
129129
self._index_columns = tuple(index_columns)
130130
# Index labels don't need complicated hierarchical access so can store as tuple
@@ -155,7 +155,13 @@ def __init__(
155155
self._transpose_cache: Optional[Block] = transpose_cache
156156

157157
@classmethod
158-
def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
158+
def from_local(
159+
cls,
160+
data: pd.DataFrame,
161+
session: bigframes.Session,
162+
*,
163+
cache_transpose: bool = True,
164+
) -> Block:
159165
# Assumes caller has already converted datatypes to bigframes ones.
160166
pd_data = data
161167
column_labels = pd_data.columns
@@ -169,12 +175,21 @@ def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
169175
pd_data = pd_data.reset_index(names=index_ids)
170176
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
171177
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
172-
return cls(
178+
block = cls(
173179
array_value,
174180
column_labels=column_labels,
175181
index_columns=index_ids,
176182
index_labels=index_labels,
177183
)
184+
if cache_transpose:
185+
try:
186+
# this cache will help when aligning on axis=1
187+
block = block.with_transpose_cache(
188+
cls.from_local(data.T, session, cache_transpose=False)
189+
)
190+
except Exception:
191+
pass
192+
return block
178193

179194
@property
180195
def index(self) -> BlockIndexProperties:
@@ -724,12 +739,18 @@ def with_column_labels(
724739
f"The column labels size `{len(label_list)} ` should equal to the value"
725740
+ f"columns size: {len(self.value_columns)}."
726741
)
727-
return Block(
742+
block = Block(
728743
self._expr,
729744
index_columns=self.index_columns,
730745
column_labels=label_list,
731746
index_labels=self.index.names,
732747
)
748+
singleton_label = len(list(value)) == 1 and list(value)[0]
749+
if singleton_label is not None and self._transpose_cache is not None:
750+
new_cache, label_id = self._transpose_cache.create_constant(singleton_label)
751+
new_cache = new_cache.set_index([label_id])
752+
block = block.with_transpose_cache(new_cache)
753+
return block
733754

734755
def with_transpose_cache(self, transposed: Block):
735756
return Block(
@@ -1930,10 +1951,169 @@ def merge(
19301951
coalesce_labels=matching_join_labels,
19311952
suffixes=suffixes,
19321953
)
1933-
# Constructs default index
1934-
offset_index_id = guid.generate_guid()
1935-
expr = joined_expr.promote_offsets(offset_index_id)
1936-
return Block(expr, index_columns=[offset_index_id], column_labels=labels)
1954+
1955+
# Construct a default index only if this object and the other both have
1956+
# indexes. In other words, joining anything to a NULL index object
1957+
# keeps everything as a NULL index.
1958+
#
1959+
# This keeps us from generating an index if the user joins a large
1960+
# BigQuery table against small local data, for example.
1961+
if len(self._index_columns) > 0 and len(other._index_columns) > 0:
1962+
offset_index_id = guid.generate_guid()
1963+
expr = joined_expr.promote_offsets(offset_index_id)
1964+
index_columns = [offset_index_id]
1965+
else:
1966+
expr = joined_expr
1967+
index_columns = []
1968+
1969+
return Block(expr, index_columns=index_columns, column_labels=labels)
1970+
1971+
def _align_both_axes(
1972+
self, other: Block, how: str
1973+
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
1974+
# Join rows
1975+
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
1976+
# join columns schema
1977+
# indexers will be none for exact match
1978+
if self.column_labels.equals(other.column_labels):
1979+
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
1980+
else:
1981+
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
1982+
other.column_labels, how="outer", return_indexers=True
1983+
)
1984+
lcol_indexer = (
1985+
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
1986+
)
1987+
rcol_indexer = (
1988+
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
1989+
)
1990+
1991+
left_input_lookup = (
1992+
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
1993+
if index != -1
1994+
else ex.const(None)
1995+
)
1996+
righ_input_lookup = (
1997+
lambda index: ex.free_var(get_column_right[other.value_columns[index]])
1998+
if index != -1
1999+
else ex.const(None)
2000+
)
2001+
2002+
left_inputs = [left_input_lookup(i) for i in lcol_indexer]
2003+
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
2004+
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
2005+
2006+
def _align_axis_0(
2007+
self, other: Block, how: str
2008+
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
2009+
assert len(other.value_columns) == 1
2010+
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
2011+
2012+
series_column_id = other.value_columns[0]
2013+
inputs = tuple(
2014+
(
2015+
ex.free_var(get_column_left[col]),
2016+
ex.free_var(get_column_right[series_column_id]),
2017+
)
2018+
for col in self.value_columns
2019+
)
2020+
return aligned_block, self.column_labels, inputs
2021+
2022+
def _align_series_block_axis_1(
2023+
self, other: Block, how: str
2024+
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
2025+
assert len(other.value_columns) == 1
2026+
if other._transpose_cache is None:
2027+
raise ValueError(
2028+
"Wrong align method, this approach requires transpose cache"
2029+
)
2030+
2031+
# Join rows
2032+
aligned_block, (get_column_left, get_column_right) = join_with_single_row(
2033+
self, other.transpose()
2034+
)
2035+
# join columns schema
2036+
# indexers will be none for exact match
2037+
if self.column_labels.equals(other.transpose().column_labels):
2038+
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
2039+
else:
2040+
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
2041+
other.transpose().column_labels, how=how, return_indexers=True
2042+
)
2043+
lcol_indexer = (
2044+
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
2045+
)
2046+
rcol_indexer = (
2047+
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
2048+
)
2049+
2050+
left_input_lookup = (
2051+
lambda index: ex.free_var(get_column_left[self.value_columns[index]])
2052+
if index != -1
2053+
else ex.const(None)
2054+
)
2055+
righ_input_lookup = (
2056+
lambda index: ex.free_var(
2057+
get_column_right[other.transpose().value_columns[index]]
2058+
)
2059+
if index != -1
2060+
else ex.const(None)
2061+
)
2062+
2063+
left_inputs = [left_input_lookup(i) for i in lcol_indexer]
2064+
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
2065+
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
2066+
2067+
def _align_pd_series_axis_1(
2068+
self, other: pd.Series, how: str
2069+
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
2070+
if self.column_labels.equals(other.index):
2071+
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
2072+
else:
2073+
if not (self.column_labels.is_unique and other.index.is_unique):
2074+
raise ValueError("Cannot align non-unique indices")
2075+
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
2076+
other.index, how=how, return_indexers=True
2077+
)
2078+
lcol_indexer = (
2079+
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
2080+
)
2081+
rcol_indexer = (
2082+
rcol_indexer if (rcol_indexer is not None) else range(len(columns))
2083+
)
2084+
2085+
left_input_lookup = (
2086+
lambda index: ex.free_var(self.value_columns[index])
2087+
if index != -1
2088+
else ex.const(None)
2089+
)
2090+
righ_input_lookup = (
2091+
lambda index: ex.const(other.iloc[index]) if index != -1 else ex.const(None)
2092+
)
2093+
2094+
left_inputs = [left_input_lookup(i) for i in lcol_indexer]
2095+
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
2096+
return self, columns, tuple(zip(left_inputs, right_inputs))
2097+
2098+
def _apply_binop(
2099+
self,
2100+
op: ops.BinaryOp,
2101+
inputs: Sequence[Tuple[ex.Expression, ex.Expression]],
2102+
labels: pd.Index,
2103+
reverse: bool = False,
2104+
) -> Block:
2105+
block = self
2106+
binop_result_ids = []
2107+
for left_input, right_input in inputs:
2108+
expr = (
2109+
op.as_expr(right_input, left_input)
2110+
if reverse
2111+
else op.as_expr(left_input, right_input)
2112+
)
2113+
block, result_col_id = block.project_expr(expr)
2114+
binop_result_ids.append(result_col_id)
2115+
2116+
return block.select_columns(binop_result_ids).with_column_labels(labels)
19372117

19382118
def join(
19392119
self,
@@ -2256,15 +2436,6 @@ def column_ids(self) -> Sequence[str]:
22562436
"""Column(s) to use as row labels."""
22572437
return self._block._index_columns
22582438

2259-
def __repr__(self) -> str:
2260-
"""Converts an Index to a string."""
2261-
# TODO(swast): Add a timeout here? If the query is taking a long time,
2262-
# maybe we just print the job metadata that we have so far?
2263-
# TODO(swast): Avoid downloading the whole index by using job
2264-
# metadata, like we do with DataFrame.
2265-
preview = self.to_pandas()
2266-
return repr(preview)
2267-
22682439
def to_pandas(self) -> pd.Index:
22692440
"""Executes deferred operations and downloads the results."""
22702441
if len(self.column_ids) == 0:
@@ -2359,6 +2530,61 @@ def join_indexless(
23592530
)
23602531

23612532

2533+
def join_with_single_row(
2534+
left: Block,
2535+
single_row_block: Block,
2536+
) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]:
2537+
"""
2538+
Special join case where other is a single row block.
2539+
This property is not validated, caller responsible for not passing multi-row block.
2540+
Preserves index of the left block, ignoring label of other.
2541+
"""
2542+
left_expr = left.expr
2543+
# ignore index columns by dropping them
2544+
right_expr = single_row_block.expr.select_columns(single_row_block.value_columns)
2545+
left_mappings = [
2546+
join_defs.JoinColumnMapping(
2547+
source_table=join_defs.JoinSide.LEFT,
2548+
source_id=id,
2549+
destination_id=guid.generate_guid(),
2550+
)
2551+
for id in left_expr.column_ids
2552+
]
2553+
right_mappings = [
2554+
join_defs.JoinColumnMapping(
2555+
source_table=join_defs.JoinSide.RIGHT,
2556+
source_id=id,
2557+
destination_id=guid.generate_guid(),
2558+
)
2559+
for id in right_expr.column_ids # skip index column
2560+
]
2561+
2562+
join_def = join_defs.JoinDefinition(
2563+
conditions=(),
2564+
mappings=(*left_mappings, *right_mappings),
2565+
type="cross",
2566+
)
2567+
combined_expr = left_expr.join(
2568+
right_expr,
2569+
join_def=join_def,
2570+
)
2571+
get_column_left = join_def.get_left_mapping()
2572+
get_column_right = join_def.get_right_mapping()
2573+
# Drop original indices from each side. and used the coalesced combination generated by the join.
2574+
index_cols_post_join = [get_column_left[id] for id in left.index_columns]
2575+
2576+
block = Block(
2577+
combined_expr,
2578+
index_columns=index_cols_post_join,
2579+
column_labels=left.column_labels.append(single_row_block.column_labels),
2580+
index_labels=[left.index.name],
2581+
)
2582+
return (
2583+
block,
2584+
(get_column_left, get_column_right),
2585+
)
2586+
2587+
23622588
def join_mono_indexed(
23632589
left: Block,
23642590
right: Block,
@@ -2546,7 +2772,7 @@ def coalesce_columns(
25462772
) -> Tuple[core.ArrayValue, Sequence[str]]:
25472773
result_ids = []
25482774
for left_id, right_id in zip(left_ids, right_ids):
2549-
if how == "left" or how == "inner":
2775+
if how == "left" or how == "inner" or how == "cross":
25502776
result_ids.append(left_id)
25512777
expr = expr.drop_columns([right_id])
25522778
elif how == "right":

bigframes/core/groupby/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ def size(self) -> series.Series:
560560
by_column_ids=self._by_col_ids,
561561
dropna=self._dropna,
562562
)
563-
return series.Series(agg_block, name=self._value_name)
563+
return series.Series(agg_block.with_column_labels([self._value_name]))
564564

565565
def skew(self, *args, **kwargs) -> series.Series:
566566
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)

0 commit comments

Comments
 (0)