Skip to content

Commit dbb66e2

Browse files
Merge remote-tracking branch 'github/main' into align_df_series
2 parents 123bbbb + 0e25a3b commit dbb66e2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1991
-330
lines changed

.kokoro/continuous/e2e.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Only run this nox session.
44
env_vars: {
55
key: "NOX_SESSION"
6-
value: "unit_prerelease system_prerelease system_noextras e2e notebook"
6+
value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras"
77
}
88

99
env_vars: {

.kokoro/presubmit/e2e.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Only run this nox session.
44
env_vars: {
55
key: "NOX_SESSION"
6-
value: "unit_prerelease system_prerelease system_noextras e2e notebook"
6+
value: "e2e doctest notebook unit_prerelease system_prerelease system_noextras"
77
}
88

99
env_vars: {

bigframes/constants.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import datetime
16-
1715
"""Constants used across BigQuery DataFrames.
1816
1917
This module should not depend on any others in the package.
2018
"""
2119

22-
FEEDBACK_LINK = (
23-
"Share your usecase with the BigQuery DataFrames team at the "
24-
"https://bit.ly/bigframes-feedback survey."
25-
)
20+
import datetime
21+
22+
import bigframes_vendored.constants
2623

27-
ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}"
24+
FEEDBACK_LINK = bigframes_vendored.constants.FEEDBACK_LINK
25+
ABSTRACT_METHOD_ERROR_MESSAGE = (
26+
bigframes_vendored.constants.ABSTRACT_METHOD_ERROR_MESSAGE
27+
)
2828

2929
DEFAULT_EXPIRATION = datetime.timedelta(days=7)
3030

bigframes/core/blocks.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,34 @@ def aggregate_all_and_stack(
10361036
index_labels=self.index.names,
10371037
)
10381038

1039+
def aggregate_size(
1040+
self,
1041+
by_column_ids: typing.Sequence[str] = (),
1042+
*,
1043+
dropna: bool = True,
1044+
):
1045+
"""Returns a block object to compute the size(s) of groups."""
1046+
agg_specs = [
1047+
(ex.NullaryAggregation(agg_ops.SizeOp()), guid.generate_guid()),
1048+
]
1049+
output_col_ids = [agg_spec[1] for agg_spec in agg_specs]
1050+
result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna)
1051+
names: typing.List[Label] = []
1052+
for by_col_id in by_column_ids:
1053+
if by_col_id in self.value_columns:
1054+
names.append(self.col_id_to_label[by_col_id])
1055+
else:
1056+
names.append(self.col_id_to_index_name[by_col_id])
1057+
return (
1058+
Block(
1059+
result_expr,
1060+
index_columns=by_column_ids,
1061+
column_labels=["size"],
1062+
index_labels=names,
1063+
),
1064+
output_col_ids,
1065+
)
1066+
10391067
def select_column(self, id: str) -> Block:
10401068
return self.select_columns([id])
10411069

bigframes/core/compile/aggregate_compiler.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def compile_aggregate(
3636
bindings: typing.Dict[str, ibis_types.Value],
3737
order_by: typing.Sequence[ibis_types.Value] = [],
3838
) -> ibis_types.Value:
39+
if isinstance(aggregate, ex.NullaryAggregation):
40+
return compile_nullary_agg(aggregate.op)
3941
if isinstance(aggregate, ex.UnaryAggregation):
4042
input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings)
4143
if aggregate.op.can_order_by:
@@ -55,7 +57,9 @@ def compile_analytic(
5557
window: window_spec.WindowSpec,
5658
bindings: typing.Dict[str, ibis_types.Value],
5759
) -> ibis_types.Value:
58-
if isinstance(aggregate, ex.UnaryAggregation):
60+
if isinstance(aggregate, ex.NullaryAggregation):
61+
return compile_nullary_agg(aggregate.op, window)
62+
elif isinstance(aggregate, ex.UnaryAggregation):
5963
input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings)
6064
return compile_unary_agg(aggregate.op, input, window)
6165
elif isinstance(aggregate, ex.BinaryAggregation):
@@ -93,6 +97,14 @@ def compile_ordered_unary_agg(
9397
raise ValueError(f"Can't compile unrecognized operation: {op}")
9498

9599

100+
@functools.singledispatch
101+
def compile_nullary_agg(
102+
op: agg_ops.WindowOp,
103+
window: Optional[window_spec.WindowSpec] = None,
104+
) -> ibis_types.Value:
105+
raise ValueError(f"Can't compile unrecognized operation: {op}")
106+
107+
96108
def numeric_op(operation):
97109
@functools.wraps(operation)
98110
def constrained_op(
@@ -118,6 +130,11 @@ def constrained_op(
118130
### Specific Op implementations Below
119131

120132

133+
@compile_nullary_agg.register
134+
def _(op: agg_ops.SizeOp, window=None) -> ibis_types.NumericValue:
135+
return _apply_window_if_present(vendored_ibis_ops.count(1), window)
136+
137+
121138
@compile_unary_agg.register
122139
@numeric_op
123140
def _(

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -856,11 +856,12 @@ def to_timestamp_op_impl(x: ibis_types.Value, op: ops.ToTimestampOp):
856856

857857
@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True)
858858
def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp):
859-
if not hasattr(op.func, "bigframes_remote_function"):
859+
ibis_node = getattr(op.func, "ibis_node", None)
860+
if ibis_node is None:
860861
raise TypeError(
861862
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
862863
)
863-
x_transformed = op.func(x)
864+
x_transformed = ibis_node(x)
864865
if not op.apply_on_null:
865866
x_transformed = ibis.case().when(x.isnull(), x).else_(x_transformed).end()
866867
return x_transformed
@@ -1342,11 +1343,12 @@ def minimum_impl(
13421343
def binary_remote_function_op_impl(
13431344
x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp
13441345
):
1345-
if not hasattr(op.func, "bigframes_remote_function"):
1346+
ibis_node = getattr(op.func, "ibis_node", None)
1347+
if ibis_node is None:
13461348
raise TypeError(
13471349
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
13481350
)
1349-
x_transformed = op.func(x, y)
1351+
x_transformed = ibis_node(x, y)
13501352
return x_transformed
13511353

13521354

bigframes/core/expression.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ def output_type(
4646
...
4747

4848

49+
@dataclasses.dataclass(frozen=True)
50+
class NullaryAggregation(Aggregation):
51+
op: agg_ops.NullaryWindowOp = dataclasses.field()
52+
53+
def output_type(
54+
self, input_types: dict[str, bigframes.dtypes.Dtype]
55+
) -> dtypes.ExpressionType:
56+
return self.op.output_type()
57+
58+
4959
@dataclasses.dataclass(frozen=True)
5060
class UnaryAggregation(Aggregation):
5161
op: agg_ops.UnaryWindowOp = dataclasses.field()

bigframes/core/groupby/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ def __getitem__(
104104
dropna=self._dropna,
105105
)
106106

107+
def size(self) -> typing.Union[df.DataFrame, series.Series]:
108+
agg_block, _ = self._block.aggregate_size(
109+
by_column_ids=self._by_col_ids,
110+
dropna=self._dropna,
111+
)
112+
agg_block = agg_block.with_column_labels(pd.Index(["size"]))
113+
dataframe = df.DataFrame(agg_block)
114+
115+
if self._as_index:
116+
series = dataframe["size"]
117+
return series.rename(None)
118+
else:
119+
return self._convert_index(dataframe)
120+
107121
def sum(self, numeric_only: bool = False, *args) -> df.DataFrame:
108122
if not numeric_only:
109123
self._raise_on_non_numeric("sum")
@@ -520,6 +534,13 @@ def std(self, *args, **kwargs) -> series.Series:
520534
def var(self, *args, **kwargs) -> series.Series:
521535
return self._aggregate(agg_ops.var_op)
522536

537+
def size(self) -> series.Series:
538+
agg_block, _ = self._block.aggregate_size(
539+
by_column_ids=self._by_col_ids,
540+
dropna=self._dropna,
541+
)
542+
return series.Series(agg_block, name=self._value_name)
543+
523544
def skew(self, *args, **kwargs) -> series.Series:
524545
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
525546
return series.Series(block)

bigframes/core/log_adapter.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,12 @@ def add_api_method(api_method_name):
9999
_api_methods = _api_methods[:MAX_LABELS_COUNT]
100100

101101

102-
def get_and_reset_api_methods():
102+
def get_and_reset_api_methods(dry_run: bool = False):
103103
global _lock
104104
with _lock:
105105
previous_api_methods = list(_api_methods)
106-
_api_methods.clear()
106+
107+
# dry_run might not make a job resource, so only reset the log on real queries.
108+
if not dry_run:
109+
_api_methods.clear()
107110
return previous_api_methods

bigframes/dataframe.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,12 @@ def memory_usage(self, index: bool = True):
400400
column_sizes = self.dtypes.map(
401401
lambda dtype: bigframes.dtypes.DTYPE_BYTE_SIZES.get(dtype, 8) * n_rows
402402
)
403-
if index:
403+
if index and self._has_index:
404404
index_size = pandas.Series([self.index._memory_usage()], index=["Index"])
405405
column_sizes = pandas.concat([index_size, column_sizes])
406406
return column_sizes
407407

408+
@requires_index
408409
def info(
409410
self,
410411
verbose: Optional[bool] = None,
@@ -778,7 +779,7 @@ def _apply_series_binop_axis_0(
778779
block = block.drop_columns([get_column_left[column_id]])
779780

780781
block = block.drop_columns([series_col])
781-
block = block.with_index_labels(self.index.names)
782+
block = block.with_index_labels(self._block.index.names)
782783
return DataFrame(block)
783784

784785
def _apply_series_binop_axis_1(
@@ -914,6 +915,11 @@ def __ne__(self, other) -> DataFrame: # type: ignore
914915

915916
__ne__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__ne__)
916917

918+
def __invert__(self) -> DataFrame:
919+
return self._apply_unary_op(ops.invert_op)
920+
921+
__invert__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__invert__)
922+
917923
def le(self, other: typing.Any, axis: str | int = "columns") -> DataFrame:
918924
return self._apply_binop(other, ops.le_op, axis=axis)
919925

@@ -1622,7 +1628,7 @@ def _assign_series_join_on_index(
16221628
# Update case, remove after copying into columns
16231629
block = block.drop_columns([source_column])
16241630

1625-
return DataFrame(block.with_index_labels(self.index.names))
1631+
return DataFrame(block.with_index_labels(self._block.index.names))
16261632

16271633
def reset_index(self, *, drop: bool = False) -> DataFrame:
16281634
block = self._block.reset_index(drop)
@@ -2917,7 +2923,9 @@ def to_csv(
29172923
field_delimiter=sep,
29182924
header=header,
29192925
)
2920-
_, query_job = self._block.expr.session._start_query(export_data_statement)
2926+
_, query_job = self._block.expr.session._start_query(
2927+
export_data_statement, api_name="dataframe-to_csv"
2928+
)
29212929
self._set_internal_query_job(query_job)
29222930

29232931
def to_json(
@@ -2959,7 +2967,9 @@ def to_json(
29592967
format="JSON",
29602968
export_options={},
29612969
)
2962-
_, query_job = self._block.expr.session._start_query(export_data_statement)
2970+
_, query_job = self._block.expr.session._start_query(
2971+
export_data_statement, api_name="dataframe-to_json"
2972+
)
29632973
self._set_internal_query_job(query_job)
29642974

29652975
def to_gbq(
@@ -3091,7 +3101,9 @@ def to_parquet(
30913101
format="PARQUET",
30923102
export_options=export_options,
30933103
)
3094-
_, query_job = self._block.expr.session._start_query(export_data_statement)
3104+
_, query_job = self._block.expr.session._start_query(
3105+
export_data_statement, api_name="dataframe-to_parquet"
3106+
)
30953107
self._set_internal_query_job(query_job)
30963108

30973109
def to_dict(
@@ -3294,7 +3306,7 @@ def _prepare_export(
32943306
array_value = self._block.expr
32953307

32963308
new_col_labels, new_idx_labels = utils.get_standardized_ids(
3297-
self._block.column_labels, self.index.names
3309+
self._block.column_labels, self._block.index.names
32983310
)
32993311

33003312
columns = list(self._block.value_columns)
@@ -3331,7 +3343,7 @@ def _run_io_query(
33313343
session = self._block.expr.session
33323344
self._optimize_query_complexity()
33333345
export_array, id_overrides = self._prepare_export(
3334-
index=index, ordering_id=ordering_id
3346+
index=index and self._has_index, ordering_id=ordering_id
33353347
)
33363348

33373349
_, query_job = session._execute(

bigframes/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ class PreviewWarning(Warning):
4141

4242
class NullIndexError(ValueError):
4343
"""Object has no index."""
44+
45+
46+
class TimeTravelDisabledWarning(Warning):
47+
"""A query was reattempted without time travel."""

0 commit comments

Comments
 (0)